diff --git a/clients/src/main/java/kafka/clients/producer/Callback.java b/clients/src/main/java/kafka/clients/producer/Callback.java
index 47e5af3..d287d78 100644
--- a/clients/src/main/java/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/kafka/clients/producer/Callback.java
@@ -2,14 +2,17 @@ package kafka.clients.producer;
/**
* A callback interface that the user can implement to allow code to execute when the request is complete. This callback
- * will execute in the background I/O thread so it should be fast.
+ * will generally execute in the background I/O thread so it should be fast.
*/
public interface Callback {
/**
- * A callback method the user should implement. This method will be called when the send to the server has
- * completed.
- * @param send The results of the call. This send is guaranteed to be completed so none of its methods will block.
+ * A callback method the user can implement to provide asynchronous handling of request completion. This method will
+ * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be
+ * non-null.
+ * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error
+ * occurred.
+ * @param exception The exception thrown during processing of this record. Null if no error occurred.
*/
- public void onCompletion(RecordSend send);
+ public void onCompletion(RecordMetadata metadata, Exception exception);
}
diff --git a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java b/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
deleted file mode 100644
index b82fcfb..0000000
--- a/clients/src/main/java/kafka/clients/producer/DefaultPartitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package kafka.clients.producer;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import kafka.common.Cluster;
-import kafka.common.utils.Utils;
-
-/**
- * A simple partitioning strategy that will work for messages with or without keys.
- *
- * If there is a partition key specified in the record the partitioner will use that for partitioning. Otherwise, if
- * there there is no partitionKey but there is a normal key that will be used. If neither key is specified the
- * partitioner will round-robin over partitions in the topic.
- *
- * For the cases where there is some key present the partition is computed based on the murmur2 hash of the serialized
- * key.
- */
-public class DefaultPartitioner implements Partitioner {
-
- private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
-
- /**
- * Compute the partition
- */
- @Override
- public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions) {
- byte[] keyToUse = partitionKey != null ? partitionKey : key;
- if (keyToUse == null)
- return Utils.abs(counter.getAndIncrement()) % numPartitions;
- else
- return Utils.abs(Utils.murmur2(keyToUse)) % numPartitions;
- }
-
-}
diff --git a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
index 58eee0c..1dd63fc 100644
--- a/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/kafka/clients/producer/KafkaProducer.java
@@ -6,17 +6,22 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import kafka.clients.producer.internals.FutureRecordMetadata;
import kafka.clients.producer.internals.Metadata;
+import kafka.clients.producer.internals.Partitioner;
import kafka.clients.producer.internals.RecordAccumulator;
import kafka.clients.producer.internals.Sender;
import kafka.common.Cluster;
import kafka.common.KafkaException;
import kafka.common.Metric;
-import kafka.common.Serializer;
+import kafka.common.PartitionInfo;
import kafka.common.TopicPartition;
import kafka.common.config.ConfigException;
-import kafka.common.errors.MessageTooLargeException;
+import kafka.common.errors.RecordTooLargeException;
import kafka.common.metrics.JmxReporter;
import kafka.common.metrics.MetricConfig;
import kafka.common.metrics.Metrics;
@@ -29,24 +34,22 @@ import kafka.common.utils.KafkaThread;
import kafka.common.utils.SystemTime;
/**
- * A Kafka producer that can be used to send data to the Kafka cluster.
+ * A Kafka client that publishes records to the Kafka cluster.
*
* The producer is thread safe and should generally be shared among all threads for best performance.
*
* The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
- * needs to communicate with. Failure to close the producer after use will leak these.
+ * needs to communicate with. Failure to close the producer after use will leak these resources.
*/
public class KafkaProducer implements Producer {
+ private final Partitioner partitioner;
private final int maxRequestSize;
private final long metadataFetchTimeoutMs;
private final long totalMemorySize;
- private final Partitioner partitioner;
private final Metadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
- private final Serializer keySerializer;
- private final Serializer valueSerializer;
private final Metrics metrics;
private final Thread ioThread;
@@ -72,9 +75,7 @@ public class KafkaProducer implements Producer {
this.metrics = new Metrics(new MetricConfig(),
Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
new SystemTime());
- this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
- this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
+ this.partitioner = new Partitioner();
this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
this.metadata = new Metadata();
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
@@ -126,78 +127,84 @@ public class KafkaProducer implements Producer {
* Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
*/
@Override
- public RecordSend send(ProducerRecord record) {
+ public Future send(ProducerRecord record) {
return send(record, null);
}
/**
* Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
*
- * The send is asynchronous and this method will return immediately once the record has been serialized and stored
- * in the buffer of messages waiting to be sent. This allows sending many records in parallel without necessitating
- * blocking to wait for the response after each one.
+ * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of
+ * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the
+ * response after each one.
+ *
+ * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to and the offset
+ * it was assigned.
+ *
+ * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the
+ * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get()
+ * get()} on this future will result in the metadata for the record or throw any exception that occurred while
+ * sending the record.
*
- * The {@link RecordSend} returned by this call will hold the future response data including the offset assigned to
- * the message and the error (if any) when the request has completed (or returned an error), and this object can be
- * used to block awaiting the response. If you want the equivalent of a simple blocking send you can easily achieve
- * that using the {@link kafka.clients.producer.RecordSend#await() await()} method on the {@link RecordSend} this
- * call returns:
+ * If you want to simulate a simple blocking call you can do the following:
*
*
- * ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
- * producer.send(myRecord, null).await();
+ * producer.send(new ProducerRecord("the-topic", "key, "value")).get();
*
- *
- * Note that the send method will not throw an exception if the request fails while communicating with the cluster,
- * rather that exception will be thrown when accessing the {@link RecordSend} that is returned.
*
* Those desiring fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
- * will be invoked when the request is complete. Note that the callback will execute in the I/O thread of the
- * producer and so should be reasonably fast. An example usage of an inline callback would be the following:
+ * will be invoked when the request is complete.
*
*
* ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
* producer.send(myRecord,
* new Callback() {
- * public void onCompletion(RecordSend send) {
- * try {
- * System.out.println("The offset of the message we just sent is: " + send.offset());
- * } catch(KafkaException e) {
+ * public void onCompletion(RecordMetadata metadata, Exception e) {
+ * if(e != null)
* e.printStackTrace();
- * }
+ * System.out.println("The offset of the record we just sent is: " + metadata.offset());
* }
* });
*
+ *
+ * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the
+ * following example callback1 is guaranteed to execute before callback2:
+ *
+ *
+ * producer.send(new ProducerRecord(topic, partition, key, value), callback1);
+ * producer.send(new ProducerRecord(topic, partition, key2, value2), callback2);
+ *
*
- * This call enqueues the message in the buffer of outgoing messages to be sent. This buffer has a hard limit on
- * it's size controlled by the configuration total.memory.bytes. If send() is called
- * faster than the I/O thread can send data to the brokers we will eventually run out of buffer space. The default
- * behavior in this case is to block the send call until the I/O thread catches up and more buffer space is
- * available. However if non-blocking usage is desired the setting block.on.buffer.full=false will
- * cause the producer to instead throw an exception when this occurs.
+ * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or
+ * they will delay the sending of messages from other threads. If you want to execute blocking or computationally
+ * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
+ * to parallelize processing.
+ *
+ * The producer manages a buffer of records waiting to be sent. This buffer has a hard limit on it's size, which is
+ * controlled by the configuration total.memory.bytes. If send() is called faster than the
+ * I/O thread can transfer data to the brokers the buffer will eventually run out of space. The default behavior in
+ * this case is to block the send call until the I/O thread catches up and more buffer space is available. However
+ * in cases where non-blocking usage is desired the setting block.on.buffer.full=false will cause the
+ * producer to instead throw an exception when buffer memory is exhausted.
*
* @param record The record to send
* @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
* indicates no callback)
- * @throws BufferExhausedException This exception is thrown if the buffer is full and blocking has been disabled.
- * @throws MessageTooLargeException This exception is thrown if the serialized size of the message is larger than
- * the maximum buffer memory or maximum request size that has been configured (whichever is smaller).
*/
@Override
- public RecordSend send(ProducerRecord record, Callback callback) {
- Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
- byte[] key = keySerializer.toBytes(record.key());
- byte[] value = valueSerializer.toBytes(record.value());
- byte[] partitionKey = keySerializer.toBytes(record.partitionKey());
- int partition = partitioner.partition(record, key, partitionKey, value, cluster, cluster.partitionsFor(record.topic()).size());
- ensureValidSize(key, value);
+ public Future send(ProducerRecord record, Callback callback) {
try {
+ Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
+ int partition = partitioner.partition(record, cluster);
+ ensureValidSize(record.key(), record.value());
TopicPartition tp = new TopicPartition(record.topic(), partition);
- RecordSend send = accumulator.append(tp, key, value, CompressionType.NONE, callback);
+ FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
this.sender.wakeup();
- return send;
- } catch (InterruptedException e) {
- throw new KafkaException(e);
+ return future;
+ } catch (Exception e) {
+ if (callback != null)
+ callback.onCompletion(null, e);
+ return new FutureFailure(e);
}
}
@@ -207,15 +214,19 @@ public class KafkaProducer implements Producer {
private void ensureValidSize(byte[] key, byte[] value) {
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
if (serializedSize > this.maxRequestSize)
- throw new MessageTooLargeException("The message is " + serializedSize
- + " bytes when serialized which is larger than the maximum request size you have configured with the "
- + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
- + " configuration.");
+ throw new RecordTooLargeException("The message is " + serializedSize
+ + " bytes when serialized which is larger than the maximum request size you have configured with the "
+ + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
+ + " configuration.");
if (serializedSize > this.totalMemorySize)
- throw new MessageTooLargeException("The message is " + serializedSize
- + " bytes when serialized which is larger than the total memory buffer you have configured with the "
- + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
- + " configuration.");
+ throw new RecordTooLargeException("The message is " + serializedSize
+ + " bytes when serialized which is larger than the total memory buffer you have configured with the "
+ + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
+ + " configuration.");
+ }
+
+ public List partitionsFor(String topic) {
+ return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsFor(topic);
}
@Override
@@ -237,4 +248,39 @@ public class KafkaProducer implements Producer {
this.metrics.close();
}
+ private static class FutureFailure implements Future {
+
+ private final ExecutionException exception;
+
+ public FutureFailure(Exception exception) {
+ this.exception = new ExecutionException(exception);
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
+ throw this.exception;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ }
+
}
diff --git a/clients/src/main/java/kafka/clients/producer/MockProducer.java b/clients/src/main/java/kafka/clients/producer/MockProducer.java
index 2ea2030..ab83d5f 100644
--- a/clients/src/main/java/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/kafka/clients/producer/MockProducer.java
@@ -7,11 +7,14 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
+import kafka.clients.producer.internals.FutureRecordMetadata;
+import kafka.clients.producer.internals.Partitioner;
import kafka.clients.producer.internals.ProduceRequestResult;
import kafka.common.Cluster;
import kafka.common.Metric;
-import kafka.common.Serializer;
+import kafka.common.PartitionInfo;
import kafka.common.TopicPartition;
/**
@@ -22,10 +25,8 @@ import kafka.common.TopicPartition;
*/
public class MockProducer implements Producer {
- private final Serializer keySerializer;
- private final Serializer valueSerializer;
- private final Partitioner partitioner;
private final Cluster cluster;
+ private final Partitioner partitioner = new Partitioner();
private final List sent;
private final Deque completions;
private boolean autoComplete;
@@ -34,21 +35,13 @@ public class MockProducer implements Producer {
/**
* Create a mock producer
*
- * @param keySerializer A serializer to use on keys (useful to test your serializer on the values)
- * @param valueSerializer A serializer to use on values (useful to test your serializer on the values)
- * @param partitioner A partitioner to choose partitions (if null the partition will always be 0)
- * @param cluster The cluster to pass to the partitioner (can be null if partitioner is null)
+ * @param cluster The cluster holding metadata for this producer
* @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
* the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
- * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link RecordSend} that is
- * returned.
+ * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link
+ * java.util.concurrent.Future Future<RecordMetadata>} that is returned.
*/
- public MockProducer(Serializer keySerializer, Serializer valueSerializer, Partitioner partitioner, Cluster cluster, boolean autoComplete) {
- if (partitioner != null && (cluster == null | keySerializer == null | valueSerializer == null))
- throw new IllegalArgumentException("If a partitioner is provided a cluster instance and key and value serializer for partitioning must also be given.");
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
- this.partitioner = partitioner;
+ public MockProducer(Cluster cluster, boolean autoComplete) {
this.cluster = cluster;
this.autoComplete = autoComplete;
this.offsets = new HashMap();
@@ -57,17 +50,16 @@ public class MockProducer implements Producer {
}
/**
- * Create a new mock producer with no serializers or partitioner and the given autoComplete setting.
+ * Create a new mock producer with invented metadata the given autoComplete setting.
*
- * Equivalent to {@link #MockProducer(Serializer, Serializer, Partitioner, Cluster, boolean) new MockProducer(null,
- * null, null, null, autoComplete)}
+ * Equivalent to {@link #MockProducer(Cluster, boolean) new MockProducer(null, autoComplete)}
*/
public MockProducer(boolean autoComplete) {
- this(null, null, null, null, autoComplete);
+ this(Cluster.empty(), autoComplete);
}
/**
- * Create a new auto completing mock producer with no serializers or partitioner.
+ * Create a new auto completing mock producer
*
* Equivalent to {@link #MockProducer(boolean) new MockProducer(true)}
*/
@@ -76,39 +68,36 @@ public class MockProducer implements Producer {
}
/**
- * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied.
+ * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied.
*
* @see #history()
*/
@Override
- public synchronized RecordSend send(ProducerRecord record) {
+ public synchronized Future send(ProducerRecord record) {
return send(record, null);
}
/**
- * Adds the record to the list of sent records. The {@link RecordSend} returned will be immediately satisfied and
- * the callback will be synchronously executed.
+ * Adds the record to the list of sent records.
*
* @see #history()
*/
@Override
- public synchronized RecordSend send(ProducerRecord record, Callback callback) {
- byte[] key = keySerializer == null ? null : keySerializer.toBytes(record.key());
- byte[] partitionKey = keySerializer == null ? null : keySerializer.toBytes(record.partitionKey());
- byte[] value = valueSerializer == null ? null : valueSerializer.toBytes(record.value());
- int numPartitions = partitioner == null ? 0 : this.cluster.partitionsFor(record.topic()).size();
- int partition = partitioner == null ? 0 : partitioner.partition(record, key, partitionKey, value, this.cluster, numPartitions);
+ public synchronized Future send(ProducerRecord record, Callback callback) {
+ int partition = 0;
+ if (this.cluster.partitionsFor(record.topic()) != null)
+ partition = partitioner.partition(record, this.cluster);
ProduceRequestResult result = new ProduceRequestResult();
- RecordSend send = new RecordSend(0, result);
+ FutureRecordMetadata future = new FutureRecordMetadata(result, 0);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
long offset = nextOffset(topicPartition);
- Completion completion = new Completion(topicPartition, offset, send, result, callback);
+ Completion completion = new Completion(topicPartition, offset, new RecordMetadata(topicPartition, offset), result, callback);
this.sent.add(record);
if (autoComplete)
completion.complete(null);
else
this.completions.addLast(completion);
- return send;
+ return future;
}
/**
@@ -126,13 +115,14 @@ public class MockProducer implements Producer {
}
}
+ public List partitionsFor(String topic) {
+ return this.cluster.partitionsFor(topic);
+ }
+
public Map metrics() {
return Collections.emptyMap();
}
- /**
- * "Closes" the producer
- */
@Override
public void close() {
}
@@ -178,13 +168,17 @@ public class MockProducer implements Producer {
private static class Completion {
private final long offset;
- private final RecordSend send;
+ private final RecordMetadata metadata;
private final ProduceRequestResult result;
private final Callback callback;
private final TopicPartition topicPartition;
- public Completion(TopicPartition topicPartition, long offset, RecordSend send, ProduceRequestResult result, Callback callback) {
- this.send = send;
+ public Completion(TopicPartition topicPartition,
+ long offset,
+ RecordMetadata metadata,
+ ProduceRequestResult result,
+ Callback callback) {
+ this.metadata = metadata;
this.offset = offset;
this.result = result;
this.callback = callback;
@@ -193,8 +187,12 @@ public class MockProducer implements Producer {
public void complete(RuntimeException e) {
result.done(topicPartition, e == null ? offset : -1L, e);
- if (callback != null)
- callback.onCompletion(send);
+ if (callback != null) {
+ if (e == null)
+ callback.onCompletion(metadata, null);
+ else
+ callback.onCompletion(null, e);
+ }
}
}
diff --git a/clients/src/main/java/kafka/clients/producer/Partitioner.java b/clients/src/main/java/kafka/clients/producer/Partitioner.java
deleted file mode 100644
index 1b8e51f..0000000
--- a/clients/src/main/java/kafka/clients/producer/Partitioner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package kafka.clients.producer;
-
-import kafka.common.Cluster;
-
-/**
- * An interface by which clients can override the default partitioning behavior that maps records to topic partitions.
- *
- * A partitioner can use either the original java object the user provided or the serialized bytes.
- *
- * It is expected that the partitioner will make use the key for partitioning, but there is no requirement that an
- * implementation do so. An implementation can use the key, the value, the state of the cluster, or any other side data.
- */
-public interface Partitioner {
-
- /**
- * Compute the partition for the given record. This partition number must be in the range [0...numPartitions). The
- * cluster state provided is the most up-to-date view that the client has but leadership can change at any time so
- * there is no guarantee that the node that is the leader for a particular partition at the time the partition
- * function is called will still be the leader by the time the request is sent.
- *
- * @param record The record being sent
- * @param key The serialized bytes of the key (null if no key is given or the serialized form is null)
- * @param value The serialized bytes of the value (null if no value is given or the serialized form is null)
- * @param cluster The current state of the cluster
- * @param numPartitions The total number of partitions for the given topic
- * @return The partition to send this record to
- */
- public int partition(ProducerRecord record, byte[] key, byte[] partitionKey, byte[] value, Cluster cluster, int numPartitions);
-
-}
diff --git a/clients/src/main/java/kafka/clients/producer/Producer.java b/clients/src/main/java/kafka/clients/producer/Producer.java
index 6ba6633..f149f3a 100644
--- a/clients/src/main/java/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/kafka/clients/producer/Producer.java
@@ -1,8 +1,12 @@
package kafka.clients.producer;
+import java.io.Closeable;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
import kafka.common.Metric;
+import kafka.common.PartitionInfo;
/**
* The interface for the {@link KafkaProducer}
@@ -10,7 +14,7 @@ import kafka.common.Metric;
* @see KafkaProducer
* @see MockProducer
*/
-public interface Producer {
+public interface Producer extends Closeable {
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
@@ -18,12 +22,18 @@ public interface Producer {
* @param record The record to send
* @return A future which will eventually contain the response information
*/
- public RecordSend send(ProducerRecord record);
+ public Future send(ProducerRecord record);
/**
- * Send a message and invoke the given callback when the send is complete
+ * Send a record and invoke the given callback when the record has been acknowledged by the server
*/
- public RecordSend send(ProducerRecord record, Callback callback);
+ public Future send(ProducerRecord record, Callback callback);
+
+ /**
+ * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
+ * over time so this list should not be cached.
+ */
+ public List partitionsFor(String topic);
/**
* Return a map of metrics maintained by the producer
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
index 9758293..a94afc7 100644
--- a/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/kafka/clients/producer/ProducerConfig.java
@@ -25,19 +25,19 @@ public class ProducerConfig extends AbstractConfig {
public static final String BROKER_LIST_CONFIG = "metadata.broker.list";
/**
- * The amount of time to block waiting to fetch metadata about a topic the first time a message is sent to that
+ * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that
* topic.
*/
public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
/**
- * The buffer size allocated for a partition. When messages are received which are smaller than this size the
+ * The buffer size allocated for a partition. When records are received which are smaller than this size the
* producer will attempt to optimistically group them together until this size is reached.
*/
public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes";
/**
- * The total memory used by the producer to buffer messages waiting to be sent to the server. If messages are sent
+ * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
* faster than they can be delivered to the server the producer will either block or throw an exception based on the
* preference specified by {@link #BLOCK_ON_BUFFER_FULL}.
*/
@@ -56,32 +56,17 @@ public class ProducerConfig extends AbstractConfig {
public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms";
/**
- * The producer groups together any messages that arrive in between request sends. Normally this occurs only under
- * load when messages arrive faster than they can be sent out. However the client can reduce the number of requests
- * and increase throughput by adding a small amount of artificial delay to force more messages to batch together.
- * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of messages
+ * The producer groups together any records that arrive in between request sends. Normally this occurs only under
+ * load when records arrive faster than they can be sent out. However the client can reduce the number of requests
+ * and increase throughput by adding a small amount of artificial delay to force more records to batch together.
+ * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records
* for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many
- * bytes accumulated for this partition we will "linger" for the specified time waiting for more messages to show
- * up. This setting defaults to 0.
+ * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up.
+ * This setting defaults to 0.
*/
public static final String LINGER_MS_CONFIG = "linger.ms";
/**
- * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record values.
- */
- public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer.class";
-
- /**
- * The fully qualified name of the {@link kafka.common.Serializer} class to use for serializing record keys.
- */
- public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer.class";
-
- /**
- * The class to use for choosing a partition to send the message to
- */
- public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
-
- /**
* Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of
* partitions or other settings will by taken up by producers without restart.
*/
@@ -99,8 +84,8 @@ public class ProducerConfig extends AbstractConfig {
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
/**
- * The maximum size of a request. This is also effectively a cap on the maximum message size. Note that the server
- * has its own cap on message size which may be different from this.
+ * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
+ * has its own cap on record size which may be different from this.
*/
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
@@ -111,9 +96,9 @@ public class ProducerConfig extends AbstractConfig {
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
/**
- * When our memory buffer is exhausted we must either stop accepting new messages (block) or throw errors. By
- * default this setting is true and we block, however users who want to guarantee we never block can turn this into
- * an error.
+ * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default
+ * this setting is true and we block, however users who want to guarantee we never block can turn this into an
+ * error.
*/
public static final String BLOCK_ON_BUFFER_FULL = "block.on.buffer.full";
@@ -129,9 +114,6 @@ public class ProducerConfig extends AbstractConfig {
.define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
.define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
- .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah")
- .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, "blah blah")
- .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, DefaultPartitioner.class.getName(), "blah blah")
.define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
.define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
diff --git a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
index 5fddbef..b9c20bc 100644
--- a/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
+++ b/clients/src/main/java/kafka/clients/producer/ProducerRecord.java
@@ -1,34 +1,34 @@
package kafka.clients.producer;
/**
- * An unserialized key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent,
- * a value (which can be null) which is the contents of the record and an optional key (which can also be null). In
- * cases the key used for choosing a partition is going to be different the user can specify a partition key which will
- * be used only for computing the partition to which this record will be sent and will not be retained with the record.
+ * A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional
+ * partition number, and an optional key and value.
+ *
+ * If a valid partition number is specified that partition will be used when sending the record. If no partition is
+ * specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is
+ * present a partition will be assigned in a round-robin fashion.
*/
public final class ProducerRecord {
private final String topic;
- private final Object key;
- private final Object partitionKey;
- private final Object value;
+ private final Integer partition;
+ private final byte[] key;
+ private final byte[] value;
/**
- * Creates a record to be sent to Kafka using a special override key for partitioning that is different form the key
- * retained in the record
+ * Creates a record to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
+ * @param partition The partition to which the record should be sent
* @param key The key that will be included in the record
- * @param partitionKey An override for the key to be used only for partitioning purposes in the client. This key
- * will not be retained or available to downstream consumers.
* @param value The record contents
*/
- public ProducerRecord(String topic, Object key, Object partitionKey, Object value) {
+ public ProducerRecord(String topic, Integer partition, byte[] key, byte[] value) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null");
this.topic = topic;
+ this.partition = partition;
this.key = key;
- this.partitionKey = partitionKey;
this.value = value;
}
@@ -39,8 +39,8 @@ public final class ProducerRecord {
* @param key The key that will be included in the record
* @param value The record contents
*/
- public ProducerRecord(String topic, Object key, Object value) {
- this(topic, key, key, value);
+ public ProducerRecord(String topic, byte[] key, byte[] value) {
+ this(topic, null, key, value);
}
/**
@@ -49,7 +49,7 @@ public final class ProducerRecord {
* @param topic The topic this record should be sent to
* @param value The record contents
*/
- public ProducerRecord(String topic, Object value) {
+ public ProducerRecord(String topic, byte[] value) {
this(topic, null, value);
}
@@ -63,22 +63,22 @@ public final class ProducerRecord {
/**
* The key (or null if no key is specified)
*/
- public Object key() {
+ public byte[] key() {
return key;
}
/**
- * An override key to use instead of the main record key
+ * @return The value
*/
- public Object partitionKey() {
- return partitionKey;
+ public byte[] value() {
+ return value;
}
/**
- * @return The value
+ * The partition to which the record will be sent (or null if no partition was specified)
*/
- public Object value() {
- return value;
+ public Integer partition() {
+ return partition;
}
}
diff --git a/clients/src/main/java/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
new file mode 100644
index 0000000..1486586
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/RecordMetadata.java
@@ -0,0 +1,39 @@
+package kafka.clients.producer;
+
+import kafka.common.TopicPartition;
+
+/**
+ * The metadata for a record that has been acknowledged by the server
+ */
+public final class RecordMetadata {
+
+ private final long offset;
+ private final TopicPartition topicPartition;
+
+ public RecordMetadata(TopicPartition topicPartition, long offset) {
+ super();
+ this.offset = offset;
+ this.topicPartition = topicPartition;
+ }
+
+ /**
+ * The offset of the record in the topic/partition.
+ */
+ public long offset() {
+ return this.offset;
+ }
+
+ /**
+ * The topic the record was appended to
+ */
+ public String topic() {
+ return this.topicPartition.topic();
+ }
+
+ /**
+ * The partition the record was sent to
+ */
+ public int partition() {
+ return this.topicPartition.partition();
+ }
+}
diff --git a/clients/src/main/java/kafka/clients/producer/RecordSend.java b/clients/src/main/java/kafka/clients/producer/RecordSend.java
deleted file mode 100644
index 1883dab..0000000
--- a/clients/src/main/java/kafka/clients/producer/RecordSend.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package kafka.clients.producer;
-
-import java.util.concurrent.TimeUnit;
-
-import kafka.clients.producer.internals.ProduceRequestResult;
-import kafka.common.errors.ApiException;
-import kafka.common.errors.TimeoutException;
-
-/**
- * An asynchronously computed response from sending a record. Calling await() or most of the other accessor
- * methods will block until the response for this record is available. If you wish to avoid blocking provide a
- * {@link kafka.clients.producer.Callback Callback} with the record send.
- */
-public final class RecordSend {
-
- private final long relativeOffset;
- private final ProduceRequestResult result;
-
- public RecordSend(long relativeOffset, ProduceRequestResult result) {
- this.relativeOffset = relativeOffset;
- this.result = result;
- }
-
- /**
- * Block until this send has completed successfully. If the request fails, throw the error that occurred in sending
- * the request.
- * @return the same object for chaining of calls
- * @throws TimeoutException if the thread is interrupted while waiting
- * @throws ApiException if the request failed.
- */
- public RecordSend await() {
- result.await();
- if (result.error() != null)
- throw result.error();
- return this;
- }
-
- /**
- * Block until this send is complete or the given timeout elapses
- * @param timeout the time to wait
- * @param unit the units of the time given
- * @return the same object for chaining
- * @throws TimeoutException if the request isn't satisfied in the time period given or the thread is interrupted
- * while waiting
- * @throws ApiException if the request failed.
- */
- public RecordSend await(long timeout, TimeUnit unit) {
- boolean success = result.await(timeout, unit);
- if (!success)
- throw new TimeoutException("Request did not complete after " + timeout + " " + unit);
- if (result.error() != null)
- throw result.error();
- return this;
- }
-
- /**
- * Get the offset for the given message. This method will block until the request is complete and will throw an
- * exception if the request fails.
- * @return The offset
- */
- public long offset() {
- await();
- return this.result.baseOffset() + this.relativeOffset;
- }
-
- /**
- * Check if the request is complete without blocking
- */
- public boolean completed() {
- return this.result.completed();
- }
-
- /**
- * Block on request completion and return true if there was an error.
- */
- public boolean hasError() {
- result.await();
- return this.result.error() != null;
- }
-
- /**
- * Return the error thrown
- */
- public Exception error() {
- result.await();
- return this.result.error();
- }
-}
diff --git a/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
new file mode 100644
index 0000000..43b4c5d
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -0,0 +1,63 @@
+package kafka.clients.producer.internals;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import kafka.clients.producer.RecordMetadata;
+
+/**
+ * The future result of a record send
+ */
+public final class FutureRecordMetadata implements Future {
+
+ private final ProduceRequestResult result;
+ private final long relativeOffset;
+
+ public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset) {
+ this.result = result;
+ this.relativeOffset = relativeOffset;
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ return false;
+ }
+
+ @Override
+ public RecordMetadata get() throws InterruptedException, ExecutionException {
+ this.result.await();
+ return valueOrError();
+ }
+
+ @Override
+ public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ boolean occurred = this.result.await(timeout, unit);
+ if (!occurred)
+ throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms.");
+ return valueOrError();
+ }
+
+ private RecordMetadata valueOrError() throws ExecutionException {
+ if (this.result.error() != null)
+ throw new ExecutionException(this.result.error());
+ else
+ return new RecordMetadata(result.topicPartition(), this.result.baseOffset() + this.relativeOffset);
+ }
+
+ public long relativeOffset() {
+ return this.relativeOffset;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return this.result.completed();
+ }
+
+}
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
new file mode 100644
index 0000000..6d2188e
--- /dev/null
+++ b/clients/src/main/java/kafka/clients/producer/internals/Partitioner.java
@@ -0,0 +1,55 @@
+package kafka.clients.producer.internals;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import kafka.clients.producer.ProducerRecord;
+import kafka.common.Cluster;
+import kafka.common.PartitionInfo;
+import kafka.common.utils.Utils;
+
+/**
+ * The default partitioning strategy:
+ *
+ * - If a partition is specified in the record, use it
+ *
- If no partition is specified but a key is present choose a partition based on a hash of the key
+ *
- If no partition or key is present choose a partition in a round-robin fashion
+ */
+public class Partitioner {
+
+ private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());
+
+ /**
+ * Compute the partition for the given record.
+ *
+ * @param record The record being sent
+ * @param numPartitions The total number of partitions for the given topic
+ */
+ public int partition(ProducerRecord record, Cluster cluster) {
+ List partitions = cluster.partitionsFor(record.topic());
+ int numPartitions = partitions.size();
+ if (record.partition() != null) {
+ // they have given us a partition, use it
+ if (record.partition() < 0 || record.partition() >= numPartitions)
+ throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
+ + " is not in the range [0..."
+ + numPartitions
+ + "].");
+ return record.partition();
+ } else if (record.key() == null) {
+ // choose the next available node in a round-robin fashion
+ for (int i = 0; i < numPartitions; i++) {
+ int partition = Utils.abs(counter.getAndIncrement()) % numPartitions;
+ if (partitions.get(partition).leader() != null)
+ return partition;
+ }
+ // no partitions are available, give a non-available partition
+ return Utils.abs(counter.getAndIncrement()) % numPartitions;
+ } else {
+ // hash the key to choose a partition
+ return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
+ }
+ }
+
+}
diff --git a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
index 1049b61..cdae00a 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -3,14 +3,13 @@ package kafka.clients.producer.internals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import kafka.clients.producer.RecordSend;
+import kafka.clients.producer.RecordMetadata;
import kafka.common.TopicPartition;
-import kafka.common.errors.TimeoutException;
/**
* A class that models the future completion of a produce request for a single partition. There is one of these per
- * partition in a produce request and it is shared by all the {@link RecordSend} instances that are batched together for
- * the same partition in the request.
+ * partition in a produce request and it is shared by all the {@link RecordMetadata} instances that are batched together
+ * for the same partition in the request.
*/
public final class ProduceRequestResult {
@@ -25,7 +24,7 @@ public final class ProduceRequestResult {
/**
* Mark this request as complete and unblock any threads waiting on its completion.
* @param topicPartition The topic and partition to which this record set was sent was sent
- * @param baseOffset The base offset assigned to the message
+ * @param baseOffset The base offset assigned to the record
* @param error The error that occurred if there was one, or null.
*/
public void done(TopicPartition topicPartition, long baseOffset, RuntimeException error) {
@@ -38,12 +37,8 @@ public final class ProduceRequestResult {
/**
* Await the completion of this request
*/
- public void await() {
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new TimeoutException("Interrupted while waiting for request to complete.");
- }
+ public void await() throws InterruptedException {
+ latch.await();
}
/**
@@ -52,16 +47,12 @@ public final class ProduceRequestResult {
* @param unit The unit for the max time
* @return true if the request completed, false if we timed out
*/
- public boolean await(long timeout, TimeUnit unit) {
- try {
- return latch.await(timeout, unit);
- } catch (InterruptedException e) {
- throw new TimeoutException("Interrupted while waiting for request to complete.");
- }
+ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+ return latch.await(timeout, unit);
}
/**
- * The base offset for the request (the first offset in the message set)
+ * The base offset for the request (the first offset in the record set)
*/
public long baseOffset() {
return baseOffset;
@@ -75,6 +66,13 @@ public final class ProduceRequestResult {
}
/**
+ * The topic and partition to which the record was appended
+ */
+ public TopicPartition topicPartition() {
+ return topicPartition;
+ }
+
+ /**
* Has the request completed?
*/
public boolean completed() {
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
index a2b536c..c22939f 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java
@@ -11,7 +11,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import kafka.clients.producer.Callback;
-import kafka.clients.producer.RecordSend;
import kafka.common.TopicPartition;
import kafka.common.metrics.Measurable;
import kafka.common.metrics.MetricConfig;
@@ -67,21 +66,21 @@ public final class RecordAccumulator {
private void registerMetrics(Metrics metrics) {
metrics.addMetric("blocked_threads",
- "The number of user threads blocked waiting for buffer memory to enqueue their messages",
+ "The number of user threads blocked waiting for buffer memory to enqueue their records",
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.queued();
}
});
metrics.addMetric("buffer_total_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering messages).",
+ "The total amount of buffer memory that is available (not currently used for buffering records).",
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.totalMemory();
}
});
metrics.addMetric("buffer_available_bytes",
- "The total amount of buffer memory that is available (not currently used for buffering messages).",
+ "The total amount of buffer memory that is available (not currently used for buffering records).",
new Measurable() {
public double measure(MetricConfig config, long now) {
return free.availableMemory();
@@ -100,7 +99,7 @@ public final class RecordAccumulator {
* @param compression The compression codec for the record
* @param callback The user-supplied callback to execute when the request is complete
*/
- public RecordSend append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
+ public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
// check if we have an in-progress batch
@@ -108,9 +107,9 @@ public final class RecordAccumulator {
synchronized (dq) {
RecordBatch batch = dq.peekLast();
if (batch != null) {
- RecordSend send = batch.tryAppend(key, value, compression, callback);
- if (send != null)
- return send;
+ FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
+ if (future != null)
+ return future;
}
}
@@ -120,19 +119,18 @@ public final class RecordAccumulator {
synchronized (dq) {
RecordBatch first = dq.peekLast();
if (first != null) {
- RecordSend send = first.tryAppend(key, value, compression, callback);
- if (send != null) {
- // somebody else found us a batch, return the one we waited for!
- // Hopefully this doesn't happen
+ FutureRecordMetadata future = first.tryAppend(key, value, compression, callback);
+ if (future != null) {
+ // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
// often...
free.deallocate(buffer);
- return send;
+ return future;
}
}
RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
- RecordSend send = Utils.notNull(batch.tryAppend(key, value, compression, callback));
+ FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
dq.addLast(batch);
- return send;
+ return future;
}
}
diff --git a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
index 4a536a2..633f4af 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/RecordBatch.java
@@ -4,7 +4,7 @@ import java.util.ArrayList;
import java.util.List;
import kafka.clients.producer.Callback;
-import kafka.clients.producer.RecordSend;
+import kafka.clients.producer.RecordMetadata;
import kafka.common.TopicPartition;
import kafka.common.record.CompressionType;
import kafka.common.record.MemoryRecords;
@@ -31,19 +31,20 @@ public final class RecordBatch {
}
/**
- * Append the message to the current message set and return the relative offset within that message set
+ * Append the record to the current record set and return the relative offset within that record set
*
- * @return The RecordSend corresponding to this message or null if there isn't sufficient room.
+ * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
*/
- public RecordSend tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
+ public FutureRecordMetadata tryAppend(byte[] key, byte[] value, CompressionType compression, Callback callback) {
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
this.records.append(0L, key, value, compression);
- RecordSend send = new RecordSend(this.recordCount++, this.produceFuture);
+ FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
if (callback != null)
- thunks.add(new Thunk(callback, send));
- return send;
+ thunks.add(new Thunk(callback, this.recordCount));
+ this.recordCount++;
+ return future;
}
}
@@ -58,7 +59,12 @@ public final class RecordBatch {
// execute callbacks
for (int i = 0; i < this.thunks.size(); i++) {
try {
- this.thunks.get(i).execute();
+ Thunk thunk = this.thunks.get(i);
+ if (exception == null)
+ thunk.callback.onCompletion(new RecordMetadata(topicPartition, this.produceFuture.baseOffset() + thunk.relativeOffset),
+ null);
+ else
+ thunk.callback.onCompletion(null, exception);
} catch (Exception e) {
e.printStackTrace();
}
@@ -70,15 +76,11 @@ public final class RecordBatch {
*/
final private static class Thunk {
final Callback callback;
- final RecordSend send;
+ final long relativeOffset;
- public Thunk(Callback callback, RecordSend send) {
+ public Thunk(Callback callback, long relativeOffset) {
this.callback = callback;
- this.send = send;
- }
-
- public void execute() {
- this.callback.onCompletion(this.send);
+ this.relativeOffset = relativeOffset;
}
}
}
\ No newline at end of file
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
index effeb9c..5ac487b 100644
--- a/clients/src/main/java/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
@@ -283,7 +283,8 @@ public class Sender implements Runnable {
private void handleMetadataResponse(Struct body, long now) {
this.metadataFetchInProgress = false;
- this.metadata.update(ProtoUtils.parseMetadataResponse(body), now);
+ Cluster cluster = ProtoUtils.parseMetadataResponse(body);
+ this.metadata.update(cluster, now);
}
/**
@@ -377,7 +378,7 @@ public class Sender implements Runnable {
buffer.flip();
Struct part = topicData.instance("data")
.set("partition", parts.get(i).topicPartition.partition())
- .set("message_set", buffer);
+ .set("record_set", buffer);
partitionData[i] = part;
}
topicData.set("data", partitionData);
diff --git a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
index 7331b73..973eb5e 100644
--- a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
@@ -7,47 +7,42 @@ import kafka.clients.producer.Callback;
import kafka.clients.producer.KafkaProducer;
import kafka.clients.producer.ProducerConfig;
import kafka.clients.producer.ProducerRecord;
-import kafka.clients.producer.RecordSend;
-import kafka.common.ByteSerialization;
+import kafka.clients.producer.RecordMetadata;
+import kafka.common.record.Records;
public class ProducerPerformance {
public static void main(String[] args) throws Exception {
if (args.length != 3) {
- System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_messages message_size");
+ System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_records record_size");
System.exit(1);
}
String url = args[0];
- int numMessages = Integer.parseInt(args[1]);
- int messageSize = Integer.parseInt(args[2]);
+ int numRecords = Integer.parseInt(args[1]);
+ int recordSize = Integer.parseInt(args[2]);
Properties props = new Properties();
props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
- props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName());
- props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteSerialization.class.getName());
KafkaProducer producer = new KafkaProducer(props);
Callback callback = new Callback() {
- public void onCompletion(RecordSend send) {
- try {
- send.offset();
- } catch (Exception e) {
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null)
e.printStackTrace();
- }
}
};
- byte[] payload = new byte[messageSize];
+ byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord record = new ProducerRecord("test", payload);
long start = System.currentTimeMillis();
long maxLatency = -1L;
long totalLatency = 0;
int reportingInterval = 1000000;
- for (int i = 0; i < numMessages; i++) {
+ for (int i = 0; i < numRecords; i++) {
long sendStart = System.currentTimeMillis();
- producer.send(record, null);
+ producer.send(record, callback);
long sendEllapsed = System.currentTimeMillis() - sendStart;
maxLatency = Math.max(maxLatency, sendEllapsed);
totalLatency += sendEllapsed;
@@ -61,9 +56,9 @@ public class ProducerPerformance {
}
}
long ellapsed = System.currentTimeMillis() - start;
- double msgsSec = 1000.0 * numMessages / (double) ellapsed;
- double mbSec = msgsSec * messageSize / (1024.0 * 1024.0);
- System.out.printf("%d messages sent in %d ms ms. %.2f messages per second (%.2f mb/sec).", numMessages, ellapsed, msgsSec, mbSec);
+ double msgsSec = 1000.0 * numRecords / (double) ellapsed;
+ double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
+ System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
producer.close();
}
diff --git a/clients/src/main/java/kafka/common/ByteSerialization.java b/clients/src/main/java/kafka/common/ByteSerialization.java
deleted file mode 100644
index eca69f1..0000000
--- a/clients/src/main/java/kafka/common/ByteSerialization.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package kafka.common;
-
-/**
- * A serialization implementation that just retains the provided byte array unchanged
- */
-public class ByteSerialization implements Serializer, Deserializer {
-
- @Override
- public Object fromBytes(byte[] bytes) {
- return bytes;
- }
-
- @Override
- public byte[] toBytes(Object o) {
- return (byte[]) o;
- }
-
-}
diff --git a/clients/src/main/java/kafka/common/Cluster.java b/clients/src/main/java/kafka/common/Cluster.java
index d0acd8d..8d045d5 100644
--- a/clients/src/main/java/kafka/common/Cluster.java
+++ b/clients/src/main/java/kafka/common/Cluster.java
@@ -12,13 +12,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import kafka.common.utils.Utils;
/**
- * A representation of the nodes, topics, and partitions in the Kafka cluster
+ * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
*/
public final class Cluster {
private final AtomicInteger counter = new AtomicInteger(0);
private final List nodes;
- private final Map nodesById;
private final Map partitionsByTopicPartition;
private final Map> partitionsByTopic;
@@ -28,22 +27,28 @@ public final class Cluster {
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
public Cluster(Collection nodes, Collection partitions) {
- this.nodes = new ArrayList(nodes);
- this.nodesById = new HashMap(this.nodes.size());
- this.partitionsByTopicPartition = new HashMap(partitions.size());
- this.partitionsByTopic = new HashMap>(partitions.size());
+ // make a randomized, unmodifiable copy of the nodes
+ List copy = new ArrayList(nodes);
+ Collections.shuffle(copy);
+ this.nodes = Collections.unmodifiableList(copy);
- Collections.shuffle(this.nodes);
- for (Node n : this.nodes)
- this.nodesById.put(n.id(), n);
+ // index the partitions by topic/partition for quick lookup
+ this.partitionsByTopicPartition = new HashMap(partitions.size());
for (PartitionInfo p : partitions)
this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
+
+ // index the partitions by topic and make the lists unmodifiable so we can handle them out in
+ // user-facing apis without risk of the client modifying the contents
+ HashMap> parts = new HashMap>();
for (PartitionInfo p : partitions) {
- if (!this.partitionsByTopic.containsKey(p.topic()))
- this.partitionsByTopic.put(p.topic(), new ArrayList());
- List ps = this.partitionsByTopic.get(p.topic());
+ if (!parts.containsKey(p.topic()))
+ parts.put(p.topic(), new ArrayList());
+ List ps = parts.get(p.topic());
ps.add(p);
}
+ this.partitionsByTopic = new HashMap>(parts.size());
+ for (Map.Entry> entry : parts.entrySet())
+ this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
}
/**
@@ -67,6 +72,13 @@ public final class Cluster {
}
/**
+ * @return The known set of nodes
+ */
+ public List nodes() {
+ return this.nodes;
+ }
+
+ /**
* Get the current leader for the given topic-partition
* @param topicPartition The topic and partition we want to know the leader for
* @return The node that is the leader for this topic-partition, or null if there is currently no leader
@@ -76,7 +88,16 @@ public final class Cluster {
if (info == null)
return null;
else
- return nodesById.get(info.leader());
+ return info.leader();
+ }
+
+ /**
+ * Get the metadata for the specified partition
+ * @param topicPartition The topic and partition to fetch info for
+ * @return The metadata about the given topic and partition
+ */
+ public PartitionInfo partition(TopicPartition topicPartition) {
+ return partitionsByTopicPartition.get(topicPartition);
}
/**
diff --git a/clients/src/main/java/kafka/common/Deserializer.java b/clients/src/main/java/kafka/common/Deserializer.java
deleted file mode 100644
index ad2e784..0000000
--- a/clients/src/main/java/kafka/common/Deserializer.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package kafka.common;
-
-/**
- * A class that controls how an object is turned into bytes. Classes implementing this interface will generally be
- * instantiated by the framework.
- *
- * An implementation that requires special configuration parameters can implement {@link Configurable}
- */
-public interface Deserializer {
-
- /**
- * Map a byte[] to an object
- * @param bytes The bytes for the object (can be null)
- * @return The deserialized object (can return null)
- */
- public Object fromBytes(byte[] bytes);
-
-}
diff --git a/clients/src/main/java/kafka/common/PartitionInfo.java b/clients/src/main/java/kafka/common/PartitionInfo.java
index f3f08dd..0e50ed7 100644
--- a/clients/src/main/java/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/kafka/common/PartitionInfo.java
@@ -7,11 +7,11 @@ public class PartitionInfo {
private final String topic;
private final int partition;
- private final int leader;
- private final int[] replicas;
- private final int[] inSyncReplicas;
+ private final Node leader;
+ private final Node[] replicas;
+ private final Node[] inSyncReplicas;
- public PartitionInfo(String topic, int partition, int leader, int[] replicas, int[] inSyncReplicas) {
+ public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
@@ -36,14 +36,14 @@ public class PartitionInfo {
/**
* The node id of the node currently acting as a leader for this partition or -1 if there is no leader
*/
- public int leader() {
+ public Node leader() {
return leader;
}
/**
* The complete set of replicas for this partition regardless of whether they are alive or up-to-date
*/
- public int[] replicas() {
+ public Node[] replicas() {
return replicas;
}
@@ -51,7 +51,7 @@ public class PartitionInfo {
* The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
* the leader should fail
*/
- public int[] inSyncReplicas() {
+ public Node[] inSyncReplicas() {
return inSyncReplicas;
}
diff --git a/clients/src/main/java/kafka/common/Serializer.java b/clients/src/main/java/kafka/common/Serializer.java
deleted file mode 100644
index 63353d8..0000000
--- a/clients/src/main/java/kafka/common/Serializer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package kafka.common;
-
-/**
- * A class that controls how an object is turned into bytes. Classes implementing this interface will generally be
- * instantiated by the framework.
- *
- * An implementation should handle null inputs.
- *
- * An implementation that requires special configuration parameters can implement {@link Configurable}
- */
-public interface Serializer {
-
- /**
- * Translate an object into bytes. The serializer must handle null inputs, and will generally just return null in
- * this case.
- * @param o The object to serialize, can be null
- * @return The serialized bytes for the object or null
- */
- public byte[] toBytes(Object o);
-
-}
diff --git a/clients/src/main/java/kafka/common/StringSerialization.java b/clients/src/main/java/kafka/common/StringSerialization.java
deleted file mode 100644
index c0ed5ca..0000000
--- a/clients/src/main/java/kafka/common/StringSerialization.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package kafka.common;
-
-import java.io.UnsupportedEncodingException;
-import java.util.Map;
-
-/**
- * A serializer and deserializer for strings.
- *
- * This class accepts a configuration parameter string.encoding which can take the string name of any supported
- * encoding. If no encoding is specified the default will be UTF-8.
- */
-public class StringSerialization implements Serializer, Deserializer, Configurable {
-
- private final static String ENCODING_CONFIG = "string.encoding";
-
- private String encoding;
-
- public StringSerialization(String encoding) {
- super();
- this.encoding = encoding;
- }
-
- public StringSerialization() {
- this("UTF8");
- }
-
- public void configure(Map configs) {
- if (configs.containsKey(ENCODING_CONFIG))
- this.encoding = (String) configs.get(ENCODING_CONFIG);
- }
-
- @Override
- public Object fromBytes(byte[] bytes) {
- if (bytes == null) {
- return null;
- } else {
- try {
- return new String(bytes, encoding);
- } catch (UnsupportedEncodingException e) {
- throw new KafkaException(e);
- }
- }
- }
-
- @Override
- public byte[] toBytes(Object o) {
- if (o == null) {
- return null;
- } else {
- try {
- return ((String) o).getBytes(encoding);
- } catch (UnsupportedEncodingException e) {
- throw new KafkaException(e);
- }
- }
- }
-
-}
diff --git a/clients/src/main/java/kafka/common/errors/CorruptMessageException.java b/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
deleted file mode 100644
index faf6234..0000000
--- a/clients/src/main/java/kafka/common/errors/CorruptMessageException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class CorruptMessageException extends ApiException {
-
- private static final long serialVersionUID = 1L;
-
- public CorruptMessageException() {
- super("This message has failed it's CRC checksum or is otherwise corrupt.");
- }
-
- public CorruptMessageException(String message) {
- super(message);
- }
-
- public CorruptMessageException(Throwable cause) {
- super(cause);
- }
-
- public CorruptMessageException(String message, Throwable cause) {
- super(message, cause);
- }
-
-}
diff --git a/clients/src/main/java/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
new file mode 100644
index 0000000..492f2e3
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class CorruptRecordException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CorruptRecordException() {
+ super("This message has failed it's CRC checksum or is otherwise corrupt.");
+ }
+
+ public CorruptRecordException(String message) {
+ super(message);
+ }
+
+ public CorruptRecordException(Throwable cause) {
+ super(cause);
+ }
+
+ public CorruptRecordException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java b/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
deleted file mode 100644
index 7417906..0000000
--- a/clients/src/main/java/kafka/common/errors/MessageTooLargeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class MessageTooLargeException extends ApiException {
-
- private static final long serialVersionUID = 1L;
-
- public MessageTooLargeException() {
- super();
- }
-
- public MessageTooLargeException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public MessageTooLargeException(String message) {
- super(message);
- }
-
- public MessageTooLargeException(Throwable cause) {
- super(cause);
- }
-
-}
diff --git a/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
new file mode 100644
index 0000000..bef4293
--- /dev/null
+++ b/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
@@ -0,0 +1,23 @@
+package kafka.common.errors;
+
+public class RecordTooLargeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RecordTooLargeException() {
+ super();
+ }
+
+ public RecordTooLargeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RecordTooLargeException(String message) {
+ super(message);
+ }
+
+ public RecordTooLargeException(Throwable cause) {
+ super(cause);
+ }
+
+}
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
index cb1aaae..65a7c64 100644
--- a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
+++ b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
@@ -33,7 +33,9 @@ public class ByteBufferReceive implements Receive {
@Override
public long readFrom(ScatteringByteChannel channel) throws IOException {
- return channel.read(buffers);
+ long read = channel.read(buffers);
+ remaining += read;
+ return read;
}
public ByteBuffer[] reify() {
diff --git a/clients/src/main/java/kafka/common/protocol/Errors.java b/clients/src/main/java/kafka/common/protocol/Errors.java
index fb1a3e5..402a6c0 100644
--- a/clients/src/main/java/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/kafka/common/protocol/Errors.java
@@ -4,9 +4,9 @@ import java.util.HashMap;
import java.util.Map;
import kafka.common.errors.ApiException;
-import kafka.common.errors.CorruptMessageException;
+import kafka.common.errors.CorruptRecordException;
import kafka.common.errors.LeaderNotAvailableException;
-import kafka.common.errors.MessageTooLargeException;
+import kafka.common.errors.RecordTooLargeException;
import kafka.common.errors.NetworkException;
import kafka.common.errors.NotLeaderForPartitionException;
import kafka.common.errors.OffsetMetadataTooLarge;
@@ -27,14 +27,14 @@ public enum Errors {
OFFSET_OUT_OF_RANGE(1,
new OffsetOutOfRangeException("The requested offset is not within the range of offsets maintained by the server.")),
CORRUPT_MESSAGE(2,
- new CorruptMessageException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
+ new CorruptRecordException("The message contents does not match the message CRC or the message is otherwise corrupt.")),
UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does not host this topic-partition.")),
LEADER_NOT_AVAILABLE(5,
new LeaderNotAvailableException("There is no leader for this topic-partition as we are in the middle of a leadership election.")),
NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")),
REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
MESSAGE_TOO_LARGE(10,
- new MessageTooLargeException("The request included a message larger than the max message size the server will accept.")),
+ new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")),
OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received."));
diff --git a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
index 83dad53..576c24d 100644
--- a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
@@ -2,7 +2,9 @@ package kafka.common.protocol;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import kafka.common.Cluster;
import kafka.common.Node;
@@ -52,14 +54,14 @@ public class ProtoUtils {
}
public static Cluster parseMetadataResponse(Struct response) {
- List brokers = new ArrayList();
+ Map brokers = new HashMap();
Object[] brokerStructs = (Object[]) response.get("brokers");
for (int i = 0; i < brokerStructs.length; i++) {
Struct broker = (Struct) brokerStructs[i];
int nodeId = (Integer) broker.get("node_id");
String host = (String) broker.get("host");
int port = (Integer) broker.get("port");
- brokers.add(new Node(nodeId, host, port));
+ brokers.put(nodeId, new Node(nodeId, host, port));
}
List partitions = new ArrayList();
Object[] topicInfos = (Object[]) response.get("topic_metadata");
@@ -75,21 +77,21 @@ public class ProtoUtils {
if (partError == Errors.NONE.code()) {
int partition = partitionInfo.getInt("partition_id");
int leader = partitionInfo.getInt("leader");
- int[] replicas = intArray((Object[]) partitionInfo.get("replicas"));
- int[] isr = intArray((Object[]) partitionInfo.get("isr"));
- partitions.add(new PartitionInfo(topic, partition, leader, replicas, isr));
+ Node leaderNode = leader == -1 ? null : brokers.get(leader);
+ Object[] replicas = (Object[]) partitionInfo.get("replicas");
+ Node[] replicaNodes = new Node[replicas.length];
+ for (int k = 0; k < replicas.length; k++)
+ replicaNodes[k] = brokers.get(replicas[k]);
+ Object[] isr = (Object[]) partitionInfo.get("isr");
+ Node[] isrNodes = new Node[isr.length];
+ for (int k = 0; k < isr.length; k++)
+ isrNodes[k] = brokers.get(isr[k]);
+ partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
}
}
}
}
- return new Cluster(brokers, partitions);
- }
-
- private static int[] intArray(Object[] ints) {
- int[] copy = new int[ints.length];
- for (int i = 0; i < ints.length; i++)
- copy[i] = (Integer) ints[i];
- return copy;
+ return new Cluster(brokers.values(), partitions);
}
}
diff --git a/clients/src/main/java/kafka/common/protocol/Protocol.java b/clients/src/main/java/kafka/common/protocol/Protocol.java
index e191d6a..49b60aa 100644
--- a/clients/src/main/java/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/kafka/common/protocol/Protocol.java
@@ -66,7 +66,7 @@ public class Protocol {
public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
- new Field("message_set", BYTES)))));
+ new Field("record_set", BYTES)))));
public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
INT16,
diff --git a/clients/src/main/java/kafka/common/record/MemoryRecords.java b/clients/src/main/java/kafka/common/record/MemoryRecords.java
index ec98226..d3f8426 100644
--- a/clients/src/main/java/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/kafka/common/record/MemoryRecords.java
@@ -48,7 +48,7 @@ public class MemoryRecords implements Records {
return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key, value);
}
- /** Write the messages in this set to the given channel */
+ /** Write the records in this set to the given channel */
public int writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffer);
}
@@ -89,7 +89,7 @@ public class MemoryRecords implements Records {
long offset = buffer.getLong();
int size = buffer.getInt();
if (size < 0)
- throw new IllegalStateException("Message with size " + size);
+ throw new IllegalStateException("Record with size " + size);
if (buffer.remaining() < size)
return allDone();
ByteBuffer rec = buffer.slice();
diff --git a/clients/src/main/java/kafka/common/record/Record.java b/clients/src/main/java/kafka/common/record/Record.java
index 835a0a4..b89accf 100644
--- a/clients/src/main/java/kafka/common/record/Record.java
+++ b/clients/src/main/java/kafka/common/record/Record.java
@@ -162,7 +162,7 @@ public final class Record {
}
/**
- * Throw an InvalidMessageException if isValid is false for this record
+ * Throw an InvalidRecordException if isValid is false for this record
*/
public void ensureValid() {
if (!isValid())
@@ -260,7 +260,7 @@ public final class Record {
}
public String toString() {
- return String.format("Message(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
+ return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes, value = %d bytes)",
magic(),
attributes(),
checksum(),
diff --git a/clients/src/test/java/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
index 68e4bd7..dd45209 100644
--- a/clients/src/test/java/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
@@ -1,12 +1,10 @@
package kafka.clients.producer;
-import static java.util.Arrays.asList;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import kafka.clients.producer.internals.Metadata;
import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
+import kafka.test.TestUtils;
import org.junit.Test;
@@ -30,7 +28,7 @@ public class MetadataTest {
Thread t2 = asyncFetch(topic);
assertTrue("Awaiting update", t1.isAlive());
assertTrue("Awaiting update", t2.isAlive());
- metadata.update(clusterWith(topic), time);
+ metadata.update(TestUtils.singletonCluster(topic, 1), time);
t1.join();
t2.join();
assertFalse("No update needed.", metadata.needsUpdate(time));
@@ -38,10 +36,6 @@ public class MetadataTest {
assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time));
}
- private Cluster clusterWith(String topic) {
- return new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo(topic, 0, 0, new int[0], new int[0])));
- }
-
private Thread asyncFetch(final String topic) {
Thread thread = new Thread() {
public void run() {
diff --git a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
index 61929a4..24b132f 100644
--- a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
@@ -5,62 +5,59 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
-import kafka.common.Serializer;
-import kafka.common.StringSerialization;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.junit.Test;
public class MockProducerTest {
+ private String topic = "topic";
+
@Test
- public void testAutoCompleteMock() {
+ public void testAutoCompleteMock() throws Exception {
MockProducer producer = new MockProducer(true);
- ProducerRecord record = new ProducerRecord("topic", "key", "value");
- RecordSend send = producer.send(record);
- assertTrue("Send should be immediately complete", send.completed());
- assertFalse("Send should be successful", send.hasError());
- assertEquals("Offset should be 0", 0, send.offset());
+ ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes());
+ Future metadata = producer.send(record);
+ assertTrue("Send should be immediately complete", metadata.isDone());
+ assertFalse("Send should be successful", isError(metadata));
+ assertEquals("Offset should be 0", 0, metadata.get().offset());
+ assertEquals(topic, metadata.get().topic());
assertEquals("We should have the record in our history", asList(record), producer.history());
producer.clear();
assertEquals("Clear should erase our history", 0, producer.history().size());
}
- public void testManualCompletion() {
+ @Test
+ public void testManualCompletion() throws Exception {
MockProducer producer = new MockProducer(false);
- ProducerRecord record1 = new ProducerRecord("topic", "key1", "value1");
- ProducerRecord record2 = new ProducerRecord("topic", "key2", "value2");
- RecordSend send1 = producer.send(record1);
- assertFalse("Send shouldn't have completed", send1.completed());
- RecordSend send2 = producer.send(record2);
- assertFalse("Send shouldn't have completed", send2.completed());
+ ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes());
+ ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes());
+ Future md1 = producer.send(record1);
+ assertFalse("Send shouldn't have completed", md1.isDone());
+ Future md2 = producer.send(record2);
+ assertFalse("Send shouldn't have completed", md2.isDone());
assertTrue("Complete the first request", producer.completeNext());
- assertFalse("Requst should be successful", send1.hasError());
- assertFalse("Second request still incomplete", send2.completed());
+ assertFalse("Requst should be successful", isError(md1));
+ assertFalse("Second request still incomplete", md2.isDone());
IllegalArgumentException e = new IllegalArgumentException("blah");
assertTrue("Complete the second request with an error", producer.errorNext(e));
try {
- send2.await();
+ md2.get();
fail("Expected error to be thrown");
- } catch (IllegalArgumentException err) {
- // this is good
+ } catch (ExecutionException err) {
+ assertEquals(e, err.getCause());
}
assertFalse("No more requests to complete", producer.completeNext());
}
- public void testSerializationAndPartitioning() {
- Cluster cluster = new Cluster(asList(new Node(0, "host", -1)), asList(new PartitionInfo("topic",
- 0,
- 0,
- new int[] { 0 },
- new int[] { 0 })));
- Serializer serializer = new StringSerialization();
- Partitioner partitioner = new DefaultPartitioner();
- MockProducer producer = new MockProducer(serializer, serializer, partitioner, cluster, true);
- ProducerRecord record = new ProducerRecord("topic", "key", "value");
- RecordSend send = producer.send(record);
- assertTrue("Send should be immediately complete", send.completed());
+ private boolean isError(Future> future) {
+ try {
+ future.get();
+ return false;
+ } catch (Exception e) {
+ return true;
+ }
}
}
diff --git a/clients/src/test/java/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/kafka/clients/producer/PartitionerTest.java
new file mode 100644
index 0000000..c18da76
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/PartitionerTest.java
@@ -0,0 +1,54 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import kafka.clients.producer.internals.Partitioner;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+
+import org.junit.Test;
+
+public class PartitionerTest {
+
+ private byte[] key = "key".getBytes();
+ private byte[] value = "value".getBytes();
+ private Partitioner partitioner = new Partitioner();
+ private Node node0 = new Node(0, "localhost", 99);
+ private Node node1 = new Node(1, "localhost", 100);
+ private Node node2 = new Node(2, "localhost", 101);
+ private Node[] nodes = new Node[] { node0, node1, node2 };
+ private String topic = "test";
+ private List partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes),
+ new PartitionInfo(topic, 1, node1, nodes, nodes),
+ new PartitionInfo(topic, 2, null, nodes, nodes));
+ private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
+
+ @Test
+ public void testUserSuppliedPartitioning() {
+ assertEquals("If the user supplies a partition we should use it.",
+ 0,
+ partitioner.partition(new ProducerRecord("test", 0, key, value), cluster));
+ }
+
+ @Test
+ public void testKeyPartitionIsStable() {
+ int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster);
+ assertEquals("Same key should yield same partition",
+ partition,
+ partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster));
+ }
+
+ @Test
+ public void testRoundRobinWithDownNode() {
+ for (int i = 0; i < partitions.size(); i++) {
+ int part = partitioner.partition(new ProducerRecord("test", value), cluster);
+ assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
+
+ }
+ }
+}
diff --git a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
index f8fd14b..804c57b 100644
--- a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
@@ -5,12 +5,14 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import kafka.clients.producer.internals.FutureRecordMetadata;
import kafka.clients.producer.internals.ProduceRequestResult;
import kafka.common.TopicPartition;
-import kafka.common.errors.CorruptMessageException;
-import kafka.common.errors.TimeoutException;
+import kafka.common.errors.CorruptRecordException;
import org.junit.Test;
@@ -24,37 +26,37 @@ public class RecordSendTest {
* Test that waiting on a request that never completes times out
*/
@Test
- public void testTimeout() {
+ public void testTimeout() throws Exception {
ProduceRequestResult request = new ProduceRequestResult();
- RecordSend send = new RecordSend(relOffset, request);
- assertFalse("Request is not completed", send.completed());
+ FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset);
+ assertFalse("Request is not completed", future.isDone());
try {
- send.await(5, TimeUnit.MILLISECONDS);
+ future.get(5, TimeUnit.MILLISECONDS);
fail("Should have thrown exception.");
} catch (TimeoutException e) { /* this is good */
}
request.done(topicPartition, baseOffset, null);
- assertTrue(send.completed());
- assertEquals(baseOffset + relOffset, send.offset());
+ assertTrue(future.isDone());
+ assertEquals(baseOffset + relOffset, future.get().offset());
}
/**
* Test that an asynchronous request will eventually throw the right exception
*/
- @Test(expected = CorruptMessageException.class)
- public void testError() {
- RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, new CorruptMessageException(), 50L));
- send.await();
+ @Test(expected = ExecutionException.class)
+ public void testError() throws Exception {
+ FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset);
+ future.get();
}
/**
* Test that an asynchronous request will eventually return the right offset
*/
@Test
- public void testBlocking() {
- RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, null, 50L));
- assertEquals(baseOffset + relOffset, send.offset());
+ public void testBlocking() throws Exception {
+ FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset);
+ assertEquals(baseOffset + relOffset, future.get().offset());
}
/* create a new request result that will be completed after the given timeout */
diff --git a/clients/src/test/java/kafka/clients/producer/SenderTest.java b/clients/src/test/java/kafka/clients/producer/SenderTest.java
index 73f1aba..8788095 100644
--- a/clients/src/test/java/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/kafka/clients/producer/SenderTest.java
@@ -1,17 +1,15 @@
package kafka.clients.producer;
-import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
+import java.util.concurrent.Future;
import kafka.clients.producer.internals.Metadata;
import kafka.clients.producer.internals.RecordAccumulator;
import kafka.clients.producer.internals.Sender;
import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
import kafka.common.TopicPartition;
import kafka.common.metrics.Metrics;
import kafka.common.network.NetworkReceive;
@@ -24,6 +22,7 @@ import kafka.common.requests.RequestSend;
import kafka.common.requests.ResponseHeader;
import kafka.common.utils.MockTime;
import kafka.test.MockSelector;
+import kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -34,11 +33,7 @@ public class SenderTest {
private MockSelector selector = new MockSelector(time);
private int batchSize = 16 * 1024;
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
- private Cluster cluster = new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo("test",
- 0,
- 0,
- new int[0],
- new int[0])));
+ private Cluster cluster = TestUtils.singletonCluster("test", 1);
private Metrics metrics = new Metrics(time);
private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time);
@@ -51,7 +46,7 @@ public class SenderTest {
@Test
public void testSimple() throws Exception {
TopicPartition tp = new TopicPartition("test", 0);
- RecordSend send = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
+ Future future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
sender.run(time.milliseconds());
assertEquals("We should have connected", 1, selector.connected().size());
selector.clear();
@@ -67,8 +62,8 @@ public class SenderTest {
offset,
Errors.NONE.code()));
sender.run(time.milliseconds());
- assertTrue("Request should be completed", send.completed());
- assertEquals(offset, send.offset());
+ assertTrue("Request should be completed", future.isDone());
+ assertEquals(offset, future.get().offset());
}
private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) {
diff --git a/clients/src/test/java/kafka/test/TestUtils.java b/clients/src/test/java/kafka/test/TestUtils.java
index a2ef3a2..90c6850 100644
--- a/clients/src/test/java/kafka/test/TestUtils.java
+++ b/clients/src/test/java/kafka/test/TestUtils.java
@@ -1,10 +1,18 @@
package kafka.test;
+import static java.util.Arrays.asList;
+
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+
/**
* Helper functions for writing unit tests
*/
@@ -20,6 +28,20 @@ public class TestUtils {
public static final Random seededRandom = new Random(192348092834L);
public static final Random random = new Random();
+ public static Cluster singletonCluster(String topic, int partitions) {
+ return clusterWith(1, topic, partitions);
+ }
+
+ public static Cluster clusterWith(int nodes, String topic, int partitions) {
+ Node[] ns = new Node[nodes];
+ for (int i = 0; i < nodes; i++)
+ ns[i] = new Node(0, "localhost", 1969);
+ List parts = new ArrayList();
+ for (int i = 0; i < partitions; i++)
+ parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+ return new Cluster(asList(ns), parts);
+ }
+
/**
* Choose a number of random available ports
*/