diff --git a/clients/src/main/java/kafka/clients/producer/Callback.java b/clients/src/main/java/kafka/clients/producer/Callback.java index 47e5af3..d287d78 100644 --- a/clients/src/main/java/kafka/clients/producer/Callback.java +++ b/clients/src/main/java/kafka/clients/producer/Callback.java @@ -2,14 +2,17 @@ package kafka.clients.producer; /** * A callback interface that the user can implement to allow code to execute when the request is complete. This callback - * will execute in the background I/O thread so it should be fast. + * will generally execute in the background I/O thread so it should be fast. */ public interface Callback { /** - * A callback method the user should implement. This method will be called when the send to the server has - * completed. - * @param send The results of the call. This send is guaranteed to be completed so none of its methods will block. + * A callback method the user can implement to provide asynchronous handling of request completion. This method will + * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be + * non-null. + * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error + * occurred. + * @param exception The exception thrown during processing of this record. Null if no error occurred. */ - public void onCompletion(RecordSend send); + public void onCompletion(RecordMetadata metadata, Exception exception); } diff --git a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java b/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java deleted file mode 100644 index b82fcfb..0000000 --- a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java +++ /dev/null @@ -1,35 +0,0 @@ -package kafka.clients.producer; - -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -import kafka.common.Cluster; -import kafka.common.utils.Utils; - -/** - * A simple partitioning strategy that will work for messages with or without keys. - *

- * If there is a partition key specified in the record the partitioner will use that for partitioning. Otherwise, if - * there there is no partitionKey but there is a normal key that will be used. If neither key is specified the - * partitioner will round-robin over partitions in the topic. - *

- * For the cases where there is some key present the partition is computed based on the murmur2 hash of the serialized - * key. - */ -public class DefaultPartitioner implements Partitioner { - - private final AtomicInteger counter = new AtomicInteger(new Random().nextInt()); - - /** - * Compute the partition - */ - @Override - public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions) { - byte[] keyToUse = partitionKey != null ? partitionKey : key; - if (keyToUse == null) - return Utils.abs(counter.getAndIncrement()) % numPartitions; - else - return Utils.abs(Utils.murmur2(keyToUse)) % numPartitions; - } - -} diff --git a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java index 58eee0c..1dd63fc 100644 --- a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java @@ -6,17 +6,22 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import kafka.clients.producer.internals.FutureRecordMetadata; import kafka.clients.producer.internals.Metadata; +import kafka.clients.producer.internals.Partitioner; import kafka.clients.producer.internals.RecordAccumulator; import kafka.clients.producer.internals.Sender; import kafka.common.Cluster; import kafka.common.KafkaException; import kafka.common.Metric; -import kafka.common.Serializer; +import kafka.common.PartitionInfo; import kafka.common.TopicPartition; import kafka.common.config.ConfigException; -import kafka.common.errors.MessageTooLargeException; +import kafka.common.errors.RecordTooLargeException; import kafka.common.metrics.JmxReporter; import kafka.common.metrics.MetricConfig; import kafka.common.metrics.Metrics; @@ -29,24 +34,22 @@ import kafka.common.utils.KafkaThread; import kafka.common.utils.SystemTime; /** - * A Kafka producer that can be used to send data to the Kafka cluster. + * A Kafka client that publishes records to the Kafka cluster. *

* The producer is thread safe and should generally be shared among all threads for best performance. *

* The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it - * needs to communicate with. Failure to close the producer after use will leak these. + * needs to communicate with. Failure to close the producer after use will leak these resources. */ public class KafkaProducer implements Producer { + private final Partitioner partitioner; private final int maxRequestSize; private final long metadataFetchTimeoutMs; private final long totalMemorySize; - private final Partitioner partitioner; private final Metadata metadata; private final RecordAccumulator accumulator; private final Sender sender; - private final Serializer keySerializer; - private final Serializer valueSerializer; private final Metrics metrics; private final Thread ioThread; @@ -72,9 +75,7 @@ public class KafkaProducer implements Producer { this.metrics = new Metrics(new MetricConfig(), Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")), new SystemTime()); - this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); - this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); - this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); + this.partitioner = new Partitioner(); this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); this.metadata = new Metadata(); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); @@ -126,78 +127,84 @@ public class KafkaProducer implements Producer { * Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)} */ @Override - public RecordSend send(ProducerRecord record) { + public Future send(ProducerRecord record) { return send(record, null); } /** * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged. *

- * The send is asynchronous and this method will return immediately once the record has been serialized and stored - * in the buffer of messages waiting to be sent. This allows sending many records in parallel without necessitating - * blocking to wait for the response after each one. + * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of + * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the + * response after each one. + *

+ * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset + * it was assigned. + *

+ * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the + * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get() + * get()} on this future will result in the metadata for the record or throw any exception that occurred while + * sending the record. *

- * The {@link RecordSend} returned by this call will hold the future response data including the offset assigned to - * the message and the error (if any) when the request has completed (or returned an error), and this object can be - * used to block awaiting the response. If you want the equivalent of a simple blocking send you can easily achieve - * that using the {@link kafka.clients.producer.RecordSend#await() await()} method on the {@link RecordSend} this - * call returns: + * If you want to simulate a simple blocking call you can do the following: * *

-     *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
-     *   producer.send(myRecord, null).await();
+     *   producer.send(new ProducerRecord("the-topic", "key, "value")).get();
      * 
- * - * Note that the send method will not throw an exception if the request fails while communicating with the cluster, - * rather that exception will be thrown when accessing the {@link RecordSend} that is returned. *

* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that - * will be invoked when the request is complete. Note that the callback will execute in the I/O thread of the - * producer and so should be reasonably fast. An example usage of an inline callback would be the following: + * will be invoked when the request is complete. * *

      *   ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
      *   producer.send(myRecord,
      *                 new Callback() {
-     *                     public void onCompletion(RecordSend send) {
-     *                         try {
-     *                             System.out.println("The offset of the message we just sent is: " + send.offset());
-     *                         } catch(KafkaException e) {
+     *                     public void onCompletion(RecordMetadata metadata, Exception e) {
+     *                         if(e != null)
      *                             e.printStackTrace();
-     *                         }
+     *                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
      *                     }
      *                 });
      * 
+ * + * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the + * following example callback1 is guaranteed to execute before callback2: + * + *
+     * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
+     * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
+     * 
*

- * This call enqueues the message in the buffer of outgoing messages to be sent. This buffer has a hard limit on - * it's size controlled by the configuration total.memory.bytes. If send() is called - * faster than the I/O thread can send data to the brokers we will eventually run out of buffer space. The default - * behavior in this case is to block the send call until the I/O thread catches up and more buffer space is - * available. However if non-blocking usage is desired the setting block.on.buffer.full=false will - * cause the producer to instead throw an exception when this occurs. + * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or + * they will delay the sending of messages from other threads. If you want to execute blocking or computationally + * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body + * to parallelize processing. + *

+ * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is + * controlled by the configuration total.memory.bytes. If send() is called faster than the + * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in + * this case is to block the send call until the I/O thread catches up and more buffer space is available. However + * in cases where non-blocking usage is desired the setting block.on.buffer.full=false will cause the + * producer to instead throw an exception when buffer memory is exhausted. * * @param record The record to send * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) - * @throws BufferExhausedException This exception is thrown if the buffer is full and blocking has been disabled. - * @throws MessageTooLargeException This exception is thrown if the serialized size of the message is larger than - * the maximum buffer memory or maximum request size that has been configured (whichever is smaller). */ @Override - public RecordSend send(ProducerRecord record, Callback callback) { - Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); - byte[] key = keySerializer.toBytes(record.key()); - byte[] value = valueSerializer.toBytes(record.value()); - byte[] partitionKey = keySerializer.toBytes(record.partitionKey()); - int partition = partitioner.partition(record, key, partitionKey, value, cluster, cluster.partitionsFor(record.topic()).size()); - ensureValidSize(key, value); + public Future send(ProducerRecord record, Callback callback) { try { + Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); + int partition = partitioner.partition(record, cluster); + ensureValidSize(record.key(), record.value()); TopicPartition tp = new TopicPartition(record.topic(), partition); - RecordSend send = accumulator.append(tp, key, value, CompressionType.NONE, callback); + FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback); this.sender.wakeup(); - return send; - } catch (InterruptedException e) { - throw new KafkaException(e); + return future; + } catch (Exception e) { + if (callback != null) + callback.onCompletion(null, e); + return new FutureFailure(e); } } @@ -207,15 +214,19 @@ public class KafkaProducer implements Producer { private void ensureValidSize(byte[] key, byte[] value) { int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); if (serializedSize > this.maxRequestSize) - throw new MessageTooLargeException("The message is " + serializedSize - + " bytes when serialized which is larger than the maximum request size you have configured with the " - + ProducerConfig.MAX_REQUEST_SIZE_CONFIG - + " configuration."); + throw new RecordTooLargeException("The message is " + serializedSize + + " bytes when serialized which is larger than the maximum request size you have configured with the " + + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + + " configuration."); if (serializedSize > this.totalMemorySize) - throw new MessageTooLargeException("The message is " + serializedSize - + " bytes when serialized which is larger than the total memory buffer you have configured with the " - + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG - + " configuration."); + throw new RecordTooLargeException("The message is " + serializedSize + + " bytes when serialized which is larger than the total memory buffer you have configured with the " + + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG + + " configuration."); + } + + public List partitionsFor(String topic) { + return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic); } @Override @@ -237,4 +248,39 @@ public class KafkaProducer implements Producer { this.metrics.close(); } + private static class FutureFailure implements Future { + + private final ExecutionException exception; + + public FutureFailure(Exception exception) { + this.exception = new ExecutionException(exception); + } + + @Override + public boolean cancel(boolean interrupt) { + return false; + } + + @Override + public RecordMetadata get() throws ExecutionException { + throw this.exception; + } + + @Override + public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException { + throw this.exception; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + } + } diff --git a/clients/src/main/java/kafka/clients/producer/MockProducer.java b/clients/src/main/java/kafka/clients/producer/MockProducer.java index 2ea2030..ab83d5f 100644 --- a/clients/src/main/java/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/kafka/clients/producer/MockProducer.java @@ -7,11 +7,14 @@ import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; +import kafka.clients.producer.internals.FutureRecordMetadata; +import kafka.clients.producer.internals.Partitioner; import kafka.clients.producer.internals.ProduceRequestResult; import kafka.common.Cluster; import kafka.common.Metric; -import kafka.common.Serializer; +import kafka.common.PartitionInfo; import kafka.common.TopicPartition; /** @@ -22,10 +25,8 @@ import kafka.common.TopicPartition; */ public class MockProducer implements Producer { - private final Serializer keySerializer; - private final Serializer valueSerializer; - private final Partitioner partitioner; private final Cluster cluster; + private final Partitioner partitioner = new Partitioner(); private final List sent; private final Deque completions; private boolean autoComplete; @@ -34,21 +35,13 @@ public class MockProducer implements Producer { /** * Create a mock producer * - * @param keySerializer A serializer to use on keys (useful to test your serializer on the values) - * @param valueSerializer A serializer to use on values (useful to test your serializer on the values) - * @param partitioner A partitioner to choose partitions (if null the partition will always be 0) - * @param cluster The cluster to pass to the partitioner (can be null if partitioner is null) + * @param cluster The cluster holding metadata for this producer * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise * the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after - * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link RecordSend} that is - * returned. + * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link + * java.util.concurrent.Future Future<RecordMetadata>} that is returned. */ - public MockProducer(Serializer keySerializer, Serializer valueSerializer, Partitioner partitioner, Cluster cluster, boolean autoComplete) { - if (partitioner != null && (cluster == null | keySerializer == null | valueSerializer == null)) - throw new IllegalArgumentException("If a partitioner is provided a cluster instance and key and value serializer for partitioning must also be given."); - this.keySerializer = keySerializer; - this.valueSerializer = valueSerializer; - this.partitioner = partitioner; + public MockProducer(Cluster cluster, boolean autoComplete) { this.cluster = cluster; this.autoComplete = autoComplete; this.offsets = new HashMap(); @@ -57,17 +50,16 @@ public class MockProducer implements Producer { } /** - * Create a new mock producer with no serializers or partitioner and the given autoComplete setting. + * Create a new mock producer with invented metadata the given autoComplete setting. * - * Equivalent to {@link #MockProducer(Serializer, Serializer, Partitioner, Cluster, boolean) new MockProducer(null, - * null, null, null, autoComplete)} + * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)} */ public MockProducer(boolean autoComplete) { - this(null, null, null, null, autoComplete); + this(Cluster.empty(), autoComplete); } /** - * Create a new auto completing mock producer with no serializers or partitioner. + * Create a new auto completing mock producer * * Equivalent to {@link #MockProducer(boolean) new MockProducer(true)} */ @@ -76,39 +68,36 @@ public class MockProducer implements Producer { } /** - * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied. + * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied. * * @see #history() */ @Override - public synchronized RecordSend send(ProducerRecord record) { + public synchronized Future send(ProducerRecord record) { return send(record, null); } /** - * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied and - * the callback will be synchronously executed. + * Adds the record to the list of sent records. * * @see #history() */ @Override - public synchronized RecordSend send(ProducerRecord record, Callback callback) { - byte[] key = keySerializer == null ? null : keySerializer.toBytes(record.key()); - byte[] partitionKey = keySerializer == null ? null : keySerializer.toBytes(record.partitionKey()); - byte[] value = valueSerializer == null ? null : valueSerializer.toBytes(record.value()); - int numPartitions = partitioner == null ? 0 : this.cluster.partitionsFor(record.topic()).size(); - int partition = partitioner == null ? 0 : partitioner.partition(record, key, partitionKey, value, this.cluster, numPartitions); + public synchronized Future send(ProducerRecord record, Callback callback) { + int partition = 0; + if (this.cluster.partitionsFor(record.topic()) != null) + partition = partitioner.partition(record, this.cluster); ProduceRequestResult result = new ProduceRequestResult(); - RecordSend send = new RecordSend(0, result); + FutureRecordMetadata future = new FutureRecordMetadata(result, 0); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); long offset = nextOffset(topicPartition); - Completion completion = new Completion(topicPartition, offset, send, result, callback); + Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback); this.sent.add(record); if (autoComplete) completion.complete(null); else this.completions.addLast(completion); - return send; + return future; } /** @@ -126,13 +115,14 @@ public class MockProducer implements Producer { } } + public List partitionsFor(String topic) { + return this.cluster.partitionsFor(topic); + } + public Map metrics() { return Collections.emptyMap(); } - /** - * "Closes" the producer - */ @Override public void close() { } @@ -178,13 +168,17 @@ public class MockProducer implements Producer { private static class Completion { private final long offset; - private final RecordSend send; + private final RecordMetadata metadata; private final ProduceRequestResult result; private final Callback callback; private final TopicPartition topicPartition; - public Completion(TopicPartition topicPartition, long offset, RecordSend send, ProduceRequestResult result, Callback callback) { - this.send = send; + public Completion(TopicPartition topicPartition, + long offset, + RecordMetadata metadata, + ProduceRequestResult result, + Callback callback) { + this.metadata = metadata; this.offset = offset; this.result = result; this.callback = callback; @@ -193,8 +187,12 @@ public class MockProducer implements Producer { public void complete(RuntimeException e) { result.done(topicPartition, e == null ? offset : -1L, e); - if (callback != null) - callback.onCompletion(send); + if (callback != null) { + if (e == null) + callback.onCompletion(metadata, null); + else + callback.onCompletion(null, e); + } } } diff --git a/clients/src/main/java/kafka/clients/producer/Partitioner.java b/clients/src/main/java/kafka/clients/producer/Partitioner.java deleted file mode 100644 index 1b8e51f..0000000 --- a/clients/src/main/java/kafka/clients/producer/Partitioner.java +++ /dev/null @@ -1,30 +0,0 @@ -package kafka.clients.producer; - -import kafka.common.Cluster; - -/** - * An interface by which clients can override the default partitioning behavior that maps records to topic partitions. - *

- * A partitioner can use either the original java object the user provided or the serialized bytes. - *

- * It is expected that the partitioner will make use the key for partitioning, but there is no requirement that an - * implementation do so. An implementation can use the key, the value, the state of the cluster, or any other side data. - */ -public interface Partitioner { - - /** - * Compute the partition for the given record. This partition number must be in the range [0...numPartitions). The - * cluster state provided is the most up-to-date view that the client has but leadership can change at any time so - * there is no guarantee that the node that is the leader for a particular partition at the time the partition - * function is called will still be the leader by the time the request is sent. - * - * @param record The record being sent - * @param key The serialized bytes of the key (null if no key is given or the serialized form is null) - * @param value The serialized bytes of the value (null if no value is given or the serialized form is null) - * @param cluster The current state of the cluster - * @param numPartitions The total number of partitions for the given topic - * @return The partition to send this record to - */ - public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions); - -} diff --git a/clients/src/main/java/kafka/clients/producer/Producer.java b/clients/src/main/java/kafka/clients/producer/Producer.java index 6ba6633..f149f3a 100644 --- a/clients/src/main/java/kafka/clients/producer/Producer.java +++ b/clients/src/main/java/kafka/clients/producer/Producer.java @@ -1,8 +1,12 @@ package kafka.clients.producer; +import java.io.Closeable; +import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import kafka.common.Metric; +import kafka.common.PartitionInfo; /** * The interface for the {@link KafkaProducer} @@ -10,7 +14,7 @@ import kafka.common.Metric; * @see KafkaProducer * @see MockProducer */ -public interface Producer { +public interface Producer extends Closeable { /** * Send the given record asynchronously and return a future which will eventually contain the response information. @@ -18,12 +22,18 @@ public interface Producer { * @param record The record to send * @return A future which will eventually contain the response information */ - public RecordSend send(ProducerRecord record); + public Future send(ProducerRecord record); /** - * Send a message and invoke the given callback when the send is complete + * Send a record and invoke the given callback when the record has been acknowledged by the server */ - public RecordSend send(ProducerRecord record, Callback callback); + public Future send(ProducerRecord record, Callback callback); + + /** + * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change + * over time so this list should not be cached. + */ + public List partitionsFor(String topic); /** * Return a map of metrics maintained by the producer diff --git a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java index 9758293..a94afc7 100644 --- a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java @@ -25,19 +25,19 @@ public class ProducerConfig extends AbstractConfig { public static final String BROKER_LIST_CONFIG = "metadata.broker.list"; /** - * The amount of time to block waiting to fetch metadata about a topic the first time a message is sent to that + * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that * topic. */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; /** - * The buffer size allocated for a partition. When messages are received which are smaller than this size the + * The buffer size allocated for a partition. When records are received which are smaller than this size the * producer will attempt to optimistically group them together until this size is reached. */ public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes"; /** - * The total memory used by the producer to buffer messages waiting to be sent to the server. If messages are sent + * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent * faster than they can be delivered to the server the producer will either block or throw an exception based on the * preference specified by {@link #BLOCK_ON_BUFFER_FULL}. */ @@ -56,32 +56,17 @@ public class ProducerConfig extends AbstractConfig { public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms"; /** - * The producer groups together any messages that arrive in between request sends. Normally this occurs only under - * load when messages arrive faster than they can be sent out. However the client can reduce the number of requests - * and increase throughput by adding a small amount of artificial delay to force more messages to batch together. - * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of messages + * The producer groups together any records that arrive in between request sends. Normally this occurs only under + * load when records arrive faster than they can be sent out. However the client can reduce the number of requests + * and increase throughput by adding a small amount of artificial delay to force more records to batch together. + * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many - * bytes accumulated for this partition we will "linger" for the specified time waiting for more messages to show - * up. This setting defaults to 0. + * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up. + * This setting defaults to 0. */ public static final String LINGER_MS_CONFIG = "linger.ms"; /** - * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record values. - */ - public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer.class"; - - /** - * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record keys. - */ - public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer.class"; - - /** - * The class to use for choosing a partition to send the message to - */ - public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class"; - - /** * Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of * partitions or other settings will by taken up by producers without restart. */ @@ -99,8 +84,8 @@ public class ProducerConfig extends AbstractConfig { public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; /** - * The maximum size of a request. This is also effectively a cap on the maximum message size. Note that the server - * has its own cap on message size which may be different from this. + * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server + * has its own cap on record size which may be different from this. */ public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size"; @@ -111,9 +96,9 @@ public class ProducerConfig extends AbstractConfig { public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; /** - * When our memory buffer is exhausted we must either stop accepting new messages (block) or throw errors. By - * default this setting is true and we block, however users who want to guarantee we never block can turn this into - * an error. + * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default + * this setting is true and we block, however users who want to guarantee we never block can turn this into an + * error. */ public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full"; @@ -129,9 +114,6 @@ public class ProducerConfig extends AbstractConfig { .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah") .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah") .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah") - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah") - .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah") - .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, DefaultPartitioner.class.getName(), "blah blah") .define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah") .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah") .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah") diff --git a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java index 5fddbef..b9c20bc 100644 --- a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java +++ b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java @@ -1,34 +1,34 @@ package kafka.clients.producer; /** - * An unserialized key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, - * a value (which can be null) which is the contents of the record and an optional key (which can also be null). In - * cases the key used for choosing a partition is going to be different the user can specify a partition key which will - * be used only for computing the partition to which this record will be sent and will not be retained with the record. + * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional + * partition number, and an optional key and value. + *

+ * If a valid partition number is specified that partition will be used when sending the record. If no partition is + * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is + * present a partition will be assigned in a round-robin fashion. */ public final class ProducerRecord { private final String topic; - private final Object key; - private final Object partitionKey; - private final Object value; + private final Integer partition; + private final byte[] key; + private final byte[] value; /** - * Creates a record to be sent to Kafka using a special override key for partitioning that is different form the key - * retained in the record + * Creates a record to be sent to a specified topic and partition * * @param topic The topic the record will be appended to + * @param partition The partition to which the record should be sent * @param key The key that will be included in the record - * @param partitionKey An override for the key to be used only for partitioning purposes in the client. This key - * will not be retained or available to downstream consumers. * @param value The record contents */ - public ProducerRecord(String topic, Object key, Object partitionKey, Object value) { + public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); this.topic = topic; + this.partition = partition; this.key = key; - this.partitionKey = partitionKey; this.value = value; } @@ -39,8 +39,8 @@ public final class ProducerRecord { * @param key The key that will be included in the record * @param value The record contents */ - public ProducerRecord(String topic, Object key, Object value) { - this(topic, key, key, value); + public ProducerRecord(String topic, byte[] key, byte[] value) { + this(topic, null, key, value); } /** @@ -49,7 +49,7 @@ public final class ProducerRecord { * @param topic The topic this record should be sent to * @param value The record contents */ - public ProducerRecord(String topic, Object value) { + public ProducerRecord(String topic, byte[] value) { this(topic, null, value); } @@ -63,22 +63,22 @@ public final class ProducerRecord { /** * The key (or null if no key is specified) */ - public Object key() { + public byte[] key() { return key; } /** - * An override key to use instead of the main record key + * @return The value */ - public Object partitionKey() { - return partitionKey; + public byte[] value() { + return value; } /** - * @return The value + * The partition to which the record will be sent (or null if no partition was specified) */ - public Object value() { - return value; + public Integer partition() { + return partition; } } diff --git a/clients/src/main/java/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java new file mode 100644 index 0000000..1486586 --- /dev/null +++ b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java @@ -0,0 +1,39 @@ +package kafka.clients.producer; + +import kafka.common.TopicPartition; + +/** + * The metadata for a record that has been acknowledged by the server + */ +public final class RecordMetadata { + + private final long offset; + private final TopicPartition topicPartition; + + public RecordMetadata(TopicPartition topicPartition, long offset) { + super(); + this.offset = offset; + this.topicPartition = topicPartition; + } + + /** + * The offset of the record in the topic/partition. + */ + public long offset() { + return this.offset; + } + + /** + * The topic the record was appended to + */ + public String topic() { + return this.topicPartition.topic(); + } + + /** + * The partition the record was sent to + */ + public int partition() { + return this.topicPartition.partition(); + } +} diff --git a/clients/src/main/java/kafka/clients/producer/RecordSend.java b/clients/src/main/java/kafka/clients/producer/RecordSend.java deleted file mode 100644 index 1883dab..0000000 --- a/clients/src/main/java/kafka/clients/producer/RecordSend.java +++ /dev/null @@ -1,88 +0,0 @@ -package kafka.clients.producer; - -import java.util.concurrent.TimeUnit; - -import kafka.clients.producer.internals.ProduceRequestResult; -import kafka.common.errors.ApiException; -import kafka.common.errors.TimeoutException; - -/** - * An asynchronously computed response from sending a record. Calling await() or most of the other accessor - * methods will block until the response for this record is available. If you wish to avoid blocking provide a - * {@link kafka.clients.producer.Callback Callback} with the record send. - */ -public final class RecordSend { - - private final long relativeOffset; - private final ProduceRequestResult result; - - public RecordSend(long relativeOffset, ProduceRequestResult result) { - this.relativeOffset = relativeOffset; - this.result = result; - } - - /** - * Block until this send has completed successfully. If the request fails, throw the error that occurred in sending - * the request. - * @return the same object for chaining of calls - * @throws TimeoutException if the thread is interrupted while waiting - * @throws ApiException if the request failed. - */ - public RecordSend await() { - result.await(); - if (result.error() != null) - throw result.error(); - return this; - } - - /** - * Block until this send is complete or the given timeout elapses - * @param timeout the time to wait - * @param unit the units of the time given - * @return the same object for chaining - * @throws TimeoutException if the request isn't satisfied in the time period given or the thread is interrupted - * while waiting - * @throws ApiException if the request failed. - */ - public RecordSend await(long timeout, TimeUnit unit) { - boolean success = result.await(timeout, unit); - if (!success) - throw new TimeoutException("Request did not complete after " + timeout + " " + unit); - if (result.error() != null) - throw result.error(); - return this; - } - - /** - * Get the offset for the given message. This method will block until the request is complete and will throw an - * exception if the request fails. - * @return The offset - */ - public long offset() { - await(); - return this.result.baseOffset() + this.relativeOffset; - } - - /** - * Check if the request is complete without blocking - */ - public boolean completed() { - return this.result.completed(); - } - - /** - * Block on request completion and return true if there was an error. - */ - public boolean hasError() { - result.await(); - return this.result.error() != null; - } - - /** - * Return the error thrown - */ - public Exception error() { - result.await(); - return this.result.error(); - } -} diff --git a/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java new file mode 100644 index 0000000..43b4c5d --- /dev/null +++ b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -0,0 +1,63 @@ +package kafka.clients.producer.internals; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import kafka.clients.producer.RecordMetadata; + +/** + * The future result of a record send + */ +public final class FutureRecordMetadata implements Future { + + private final ProduceRequestResult result; + private final long relativeOffset; + + public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) { + this.result = result; + this.relativeOffset = relativeOffset; + } + + @Override + public boolean cancel(boolean interrupt) { + return false; + } + + @Override + public RecordMetadata get() throws InterruptedException, ExecutionException { + this.result.await(); + return valueOrError(); + } + + @Override + public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + boolean occurred = this.result.await(timeout, unit); + if (!occurred) + throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms."); + return valueOrError(); + } + + private RecordMetadata valueOrError() throws ExecutionException { + if (this.result.error() != null) + throw new ExecutionException(this.result.error()); + else + return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset); + } + + public long relativeOffset() { + return this.relativeOffset; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return this.result.completed(); + } + +} diff --git a/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java new file mode 100644 index 0000000..6d2188e --- /dev/null +++ b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java @@ -0,0 +1,55 @@ +package kafka.clients.producer.internals; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import kafka.clients.producer.ProducerRecord; +import kafka.common.Cluster; +import kafka.common.PartitionInfo; +import kafka.common.utils.Utils; + +/** + * The default partitioning strategy: + *