diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 29658d4..7c156ef 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -41,14 +41,26 @@ public interface KafkaClient { public boolean ready(Node node, long now); /** - * Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready - * connections. - * @param requests The requests to send + * Queue up the given request for sending. Requests can only be sent on ready connections. + * @param request The request + * @param now The current time + */ + public void send(ClientRequest request, long now); + + /** + * Do actual reads and writes from sockets. * @param timeout The maximum amount of time to wait for responses in ms * @param now The current time in ms * @throws IllegalStateException If a request is sent to an unready node */ - public List poll(List requests, long timeout, long now); + public List poll(long timeout, long now); + + /** + * Complete all in-flight requests + * @param now The current time in ms + * @return All requests that complete during this time period. + */ + public List completeAll(long now); /** * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection, @@ -65,6 +77,12 @@ public interface KafkaClient { public int inFlightRequestCount(); /** + * Get the total in-flight requests for a particular node + * @param nodeId The id of the node + */ + public int inFlightRequestCount(int nodeId); + + /** * Generate a request header for the next request * @param key The API key of the request */ diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d8f9ce6..d082edb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -116,7 +116,7 @@ public class NetworkClient implements KafkaClient { /** * Check if the node with the given id is ready to send more requests. - * @param node The given node id + * @param node The node * @param now The current time in ms * @return true if the node is ready */ @@ -124,7 +124,8 @@ public class NetworkClient implements KafkaClient { public boolean isReady(Node node, long now) { int nodeId = node.id(); if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0) - // if we need to update our metadata now declare all requests unready to make metadata requests first priority + // if we need to update our metadata now declare all requests unready to make metadata requests first + // priority return false; else // otherwise we are ready if we are connected and can send more requests @@ -140,35 +141,34 @@ public class NetworkClient implements KafkaClient { } /** - * Initiate the given requests and check for any new responses, waiting up to the specified time. Requests can only - * be sent for ready nodes. - * @param requests The requests to initiate + * Queue up the given request for sending. Requests can only be sent out to ready nodes. + * @param request The request + * @param now The current time + */ + public void send(ClientRequest request, long now) { + int nodeId = request.request().destination(); + if (!isSendable(nodeId)) + throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); + + this.inFlightRequests.add(request); + selector.send(request.request()); + } + + /** + * Do actual reads and writes to sockets. * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately * @param now The current time in milliseconds * @return The list of responses received */ @Override - public List poll(List requests, long timeout, long now) { - List sends = new ArrayList(); - - for (int i = 0; i < requests.size(); i++) { - ClientRequest request = requests.get(i); - int nodeId = request.request().destination(); - if (!isSendable(nodeId)) - throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); - - this.inFlightRequests.add(request); - sends.add(request.request()); - } - + public List poll(long timeout, long now) { // should we update our metadata? long metadataTimeout = metadata.timeToNextUpdate(now); if (!this.metadataFetchInProgress && metadataTimeout == 0) - maybeUpdateMetadata(sends, now); - + maybeUpdateMetadata(now); // do the I/O try { - this.selector.poll(Math.min(timeout, metadataTimeout), sends); + this.selector.poll(Math.min(timeout, metadataTimeout)); } catch (IOException e) { log.error("Unexpected error during I/O in producer network thread", e); } @@ -183,6 +183,32 @@ public class NetworkClient implements KafkaClient { } /** + * Await all the outstanding responses for requests on the given connection + * @param node The node to block on + * @param now The current time in ms + * @return All the collected responses + */ + public List completeAll(int node, long now) { + this.selector.muteAll(); + this.selector.unmute(node); + List responses = new ArrayList(); + while (inFlightRequestCount(node) > 0) + responses.addAll(poll(Integer.MAX_VALUE, now)); + this.selector.unmuteAll(); + return responses; + } + + /** + * Wait for all outstanding requests to complete. + */ + public List completeAll(long now) { + List responses = new ArrayList(); + while (inFlightRequestCount() > 0) + responses.addAll(poll(Integer.MAX_VALUE, now)); + return responses; + } + + /** * Get the number of in-flight requests */ @Override @@ -191,6 +217,14 @@ public class NetworkClient implements KafkaClient { } /** + * Get the number of in-flight requests for a given node + */ + @Override + public int inFlightRequestCount(int nodeId) { + return this.inFlightRequests.inFlightRequestCount(nodeId); + } + + /** * Generate a request header for the given API key * @param key The api key * @return A request header with the appropriate client id and correlation id @@ -240,7 +274,10 @@ public class NetworkClient implements KafkaClient { found = node; } } - + if (found == null) + log.trace("Could not pick a viable least loaded node"); + else + log.trace("Picked {} as the least loaded node", found); return found; } @@ -350,7 +387,7 @@ public class NetworkClient implements KafkaClient { /** * Add a metadata request to the list of sends if we can make one */ - private void maybeUpdateMetadata(List sends, long now) { + private void maybeUpdateMetadata(long now) { Node node = this.leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); @@ -363,11 +400,12 @@ public class NetworkClient implements KafkaClient { this.metadataFetchInProgress = true; ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - sends.add(metadataRequest.request()); + this.selector.send(metadataRequest.request()); this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one - log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id()); + log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", + node.id()); initiateConnect(node, now); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 227f564..8d6a0dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -9,11 +9,12 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. -*/ + */ package org.apache.kafka.clients.consumer; import java.io.Closeable; import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.kafka.common.Metric; @@ -26,92 +27,98 @@ import org.apache.kafka.common.TopicPartition; public interface Consumer extends Closeable { /** - * Incrementally subscribe to the given list of topics. This API is mutually exclusive to - * {@link #subscribe(TopicPartition...) subscribe(partitions)} + * Incrementally subscribe to the given list of topics. This API is mutually exclusive to + * {@link #subscribe(TopicPartition...) subscribe(partitions)} * @param topics A variable list of topics that the consumer subscribes to - */ - public void subscribe(String...topics); + */ + public void subscribe(String... topics); /** - * Incrementally subscribes to a specific topic and partition. This API is mutually exclusive to + * Incrementally subscribes to a specific topic and partition. This API is mutually exclusive to * {@link #subscribe(String...) subscribe(topics)} * @param partitions Partitions to subscribe to - */ + */ public void subscribe(TopicPartition... partitions); /** - * Unsubscribe from the specific topics. Messages for this topic will not be returned from the next {@link #poll(long) poll()} - * onwards. This should be used in conjunction with {@link #subscribe(String...) subscribe(topics)}. It is an error to - * unsubscribe from a topic that was never subscribed to using {@link #subscribe(String...) subscribe(topics)} + * Unsubscribe from the specific topics. Messages for this topic will not be returned from the next + * {@link #poll(long) poll()} onwards. This should be used in conjunction with {@link #subscribe(String...) + * subscribe(topics)}. It is an error to unsubscribe from a topic that was never subscribed to using + * {@link #subscribe(String...) subscribe(topics)} * @param topics Topics to unsubscribe from */ public void unsubscribe(String... topics); /** - * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next - * {@link #poll(long) poll()} onwards. This should be used in conjunction with - * {@link #subscribe(TopicPartition...) subscribe(topic, partitions)}. It is an error to - * unsubscribe from a partition that was never subscribed to using {@link #subscribe(TopicPartition...) subscribe(partitions)} + * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next + * {@link #poll(long) poll()} onwards. This should be used in conjunction with {@link #subscribe(TopicPartition...) + * subscribe(topic, partitions)}. It is an error to unsubscribe from a partition that was never subscribed to using + * {@link #subscribe(TopicPartition...) subscribe(partitions)} * @param partitions Partitions to unsubscribe from */ public void unsubscribe(TopicPartition... partitions); - + /** * Fetches data for the subscribed list of topics and partitions - * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative - * @return Map of topic to records for the subscribed topics and partitions as soon as data is available for a topic partition. Availability - * of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}. - * If no data is available for timeout ms, returns an empty list + * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits + * indefinitely. Must not be negative + * @return Map of topic to records for the subscribed topics and partitions as soon as data is available for a topic + * partition. Availability of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and + * {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}. If no data is available for timeout ms, returns an empty + * list */ - public Map poll(long timeout); + public List poll(long timeout); /** * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. - * @param sync If true, the commit should block until the consumer receives an acknowledgment - * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null - * if the sync flag is set to false + * @param sync If true, the commit should block until the consumer receives an acknowledgment + * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. + * Returns null if the sync flag is set to false */ public OffsetMetadata commit(boolean sync); /** * Commits the specified offsets for the specified list of topics and partitions to Kafka. * @param offsets The map of offsets to commit for the given topic partitions - * @param sync If true, commit will block until the consumer receives an acknowledgment - * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null - * if the sync flag is set to false. + * @param sync If true, commit will block until the consumer receives an acknowledgment + * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. + * Returns null if the sync flag is set to false. */ public OffsetMetadata commit(Map offsets, boolean sync); - + /** - * Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to a list of topics - * using {@link #subscribe(String...) subscribe(topics)}, an exception will be thrown if the specified topic partition is not owned by - * the consumer. + * Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to + * a list of topics using {@link #subscribe(String...) subscribe(topics)}, an exception will be thrown if the + * specified topic partition is not owned by the consumer. * @param offsets The map of fetch positions per topic and partition */ - public void seek(Map offsets); + public void seek(TopicPartition partition, long offset); /** - * Returns the fetch position of the next message for the specified topic partition to be used on the next {@link #poll(long) poll()} + * Returns the fetch position of the next message for the specified topic partition to be used on the next + * {@link #poll(long) poll()} * @param partitions Partitions for which the fetch position will be returned - * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()} + * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) + * poll()} */ public Map position(Collection partitions); - + /** - * Fetches the last committed offsets for the input list of partitions + * Fetches the last committed offsets for the input list of partitions * @param partitions The list of partitions to return the last committed offset for - * @return The list of offsets for the specified list of partitions + * @return The list of offsets for the specified list of partitions */ public Map committed(Collection partitions); - + /** * Fetches offsets before a certain timestamp - * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. + * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest + * available timestamp. * @param partitions The list of partitions for which the offsets are returned * @return The offsets for messages that were written to the server before the specified timestamp. */ public Map offsetsBeforeTime(long timestamp, Collection partitions); - + /** * Return a map of metrics maintained by the consumer */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 46efc0c..2ab6c46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -106,6 +106,10 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; + /** metadata.max.age.ms */ + public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; + private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions."; + /** * The total memory used by the consumer to buffer records received from the server. This config is meant to control * the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions. @@ -121,6 +125,14 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes"; + /** send.buffer.bytes */ + public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; + private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data"; + + /** receive.buffer.bytes */ + public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; + private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data"; + /** * The id string to pass to the server when making requests. The purpose of this is to be able to track the source * of requests beyond just ip/port by allowing a logical application name to be included. @@ -128,16 +140,15 @@ public class ConsumerConfig extends AbstractConfig { public static final String CLIENT_ID_CONFIG = "client.id"; /** - * The size of the TCP send buffer to use when fetching data - */ - public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes"; - - /** * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a * host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. */ public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; + /** retry.backoff.ms */ + public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; + private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition." + " This avoids repeated fetching-and-failing in a tight loop."; + /** metrics.sample.window.ms */ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " @@ -151,23 +162,30 @@ public class ConsumerConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + /** max.in.flight.requests.per.connection */ + public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; + private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."; + static { /* TODO: add config docs */ config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah") - .define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah") + .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, "blah blah") .define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah") .define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah") - .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, Importance.MEDIUM, "blah blah") + .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, "blah", Importance.MEDIUM, "blah blah") .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah") + .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah") .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah") .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah") .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah") .define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah") - .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah") - .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah") - .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah") + .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC) + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC) + .define(FETCH_MIN_BYTES_CONFIG, Type.INT, 1024, atLeast(0), Importance.HIGH, "blah blah") + .define(FETCH_MAX_WAIT_MS_CONFIG, Type.INT, 500, atLeast(0), Importance.LOW, "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah") + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah") .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, @@ -176,8 +194,13 @@ public class ConsumerConfig extends AbstractConfig { Importance.LOW, METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC); - + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) + .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, + Type.INT, + 5, + atLeast(1), + Importance.LOW, + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC); } ConsumerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java index 436d8a4..a97252f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -9,30 +9,31 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. -*/ + */ package org.apache.kafka.clients.consumer; import org.apache.kafka.common.TopicPartition; /** - * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the - * record is being received and an offset that points to the record in a Kafka partition. + * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the + * record is being received and an offset that points to the record in a Kafka partition. */ public final class ConsumerRecord { - private final TopicPartition partition; + + private final TopicPartition partition; private final byte[] key; private final byte[] value; private final long offset; private volatile Exception error; - + /** * Creates a record to be received from a specified topic and partition * - * @param topic The topic this record is received from + * @param topic The topic this record is received from * @param partitionId The partition of the topic this record is received from - * @param key The key of the record, if one exists - * @param value The record contents - * @param offset The offset of this record in the corresponding Kafka partition + * @param key The key of the record, if one exists + * @param value The record contents + * @param offset The offset of this record in the corresponding Kafka partition */ public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) { this(topic, partitionId, key, value, offset, null); @@ -52,24 +53,24 @@ public final class ConsumerRecord { /** * Creates a record with an error code - * @param topic The topic this record is received from + * @param topic The topic this record is received from * @param partitionId The partition of the topic this record is received from - * @param error The exception corresponding to the error code returned by the server for this topic partition + * @param error The exception corresponding to the error code returned by the server for this topic partition */ public ConsumerRecord(String topic, int partitionId, Exception error) { this(topic, partitionId, null, null, -1L, error); } - + private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); this.partition = new TopicPartition(topic, partitionId); this.key = key; this.value = value; - this.offset = offset; + this.offset = offset; this.error = error; } - + /** * The topic this record is received from */ @@ -78,19 +79,19 @@ public final class ConsumerRecord { } /** - * The partition from which this record is received + * The partition from which this record is received */ public int partition() { return partition.partition(); } - + /** * The TopicPartition object containing the topic and partition */ public TopicPartition topicAndPartition() { return partition; } - + /** * The key (or null if no key is specified) * @throws Exception The exception thrown while fetching this record. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fe93afa..a273931 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -9,10 +9,12 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. -*/ + */ package org.apache.kafka.clients.consumer; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -21,33 +23,52 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Metric; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.LogEntry; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.ConsumerMetadataRequest; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A Kafka client that consumes records from a Kafka cluster. *

- * The consumer is thread safe and should generally be shared among all threads for best performance. + * The consumer is not thread safe and should not be shared among all threads for best performance. *

- * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it - * needs to communicate with. Failure to close the consumer after use will leak these resources. + * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it needs to + * communicate with. Failure to close the consumer after use will leak these resources. *

Usage Examples

- * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of - * the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages - * and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as - * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method. + * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to + * demonstrate the correct use of the available APIs. Each of the examples assumes the presence of a user implemented + * process() method that processes a given batch of messages and returns the offset of the latest processed message per + * partition. Note that process() is not part of the consumer API and is only used as a convenience method to + * demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method. + * *
  * {@code
  * private Map process(Map records) {
@@ -69,86 +90,101 @@ import org.slf4j.LoggerFactory;
  * }
  * 
*

- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load - * balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically, - * as controlled by the auto.commit.interval.ms config + * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for + * automatic consumer load balancing and failover. This example assumes that the offsets are stored in Kafka and are + * automatically committed periodically, as controlled by the auto.commit.interval.ms config + * *

- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "true");
- * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * consumer.subscribe("foo", "bar");
- * boolean isRunning = true;
- * while(isRunning) {
- *   Map records = consumer.poll(100);
- *   process(records);
- * }
- * consumer.close();
+ * {
+ *     @code
+ *     Properties props = new Properties();
+ *     props.put("metadata.broker.list", "localhost:9092");
+ *     props.put("group.id", "test");
+ *     props.put("session.timeout.ms", "1000");
+ *     props.put("enable.auto.commit", "true");
+ *     props.put("auto.commit.interval.ms", "10000");
+ *     KafkaConsumer consumer = new KafkaConsumer(props);
+ *     consumer.subscribe("foo", "bar");
+ *     boolean isRunning = true;
+ *     while (isRunning) {
+ *         Map<String, ConsumerRecords> records = consumer.poll(100);
+ *         process(records);
+ *     }
+ *     consumer.close();
  * }
  * 
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load - * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using - * the commit(boolean) API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed - * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets - * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances. + * + * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for + * automatic consumer load balancing and failover. This example assumes that the offsets are stored in Kafka and are + * manually committed using the commit(boolean) API. This example also demonstrates rewinding the consumer's offsets if + * processing of the consumed messages fails. Note that this method of rewinding offsets using {@link #seek(Map) + * seek(offsets)} is only useful for rewinding the offsets of the current consumer instance. As such, this will not + * trigger a rebalance or affect the fetch offsets for the other consumer instances. + * *
- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100);
- *     try {
- *         Map lastConsumedOffsets = process(records);
- *         consumedOffsets.putAll(lastConsumedOffsets);
- *         numRecords += records.size();
- *         // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- *         if(numRecords % commitInterval == 0) 
- *           consumer.commit(false);
- *     } catch(Exception e) {
+ * {
+ *     @code
+ *     Properties props = new Properties();
+ *     props.put("metadata.broker.list", "localhost:9092");
+ *     props.put("group.id", "test");
+ *     props.put("session.timeout.ms", "1000");
+ *     props.put("enable.auto.commit", "false");
+ *     KafkaConsumer consumer = new KafkaConsumer(props);
+ *     consumer.subscribe("foo", "bar");
+ *     int commitInterval = 100;
+ *     int numRecords = 0;
+ *     boolean isRunning = true;
+ *     Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ *     while (isRunning) {
+ *         Map<String, ConsumerRecords> records = consumer.poll(100);
  *         try {
- *             // rewind consumer's offsets for failed partitions
- *             // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
- *             List failedPartitions = failedPartitions();   
- *             Map offsetsToRewindTo = new HashMap();
- *             for(TopicPartition failedPartition : failedPartitions) {
- *                 // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
- *                 // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
- *                 offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
- *             }
- *             // seek to new offsets only for partitions that failed the last process()
- *             consumer.seek(offsetsToRewindTo);
- *         } catch(Exception e) {  break; } // rewind failed
+ *             Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *             consumedOffsets.putAll(lastConsumedOffsets);
+ *             numRecords += records.size();
+ *             // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *             if (numRecords % commitInterval == 0)
+ *                 consumer.commit(false);
+ *         } catch (Exception e) {
+ *             try {
+ *                 // rewind consumer's offsets for failed partitions
+ *                 // assume failedPartitions() returns the list of partitions for which the processing of the last batch
+ *                 // of messages failed
+ *                 List<TopicPartition> failedPartitions = failedPartitions();
+ *                 Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
+ *                 for (TopicPartition failedPartition : failedPartitions) {
+ *                     // rewind to the last consumed offset for the failed partition. Since process() failed for this
+ *                     // partition, the consumed offset
+ *                     // should still be pointing to the last successfully processed offset and hence is the right offset
+ *                     // to rewind consumption to.
+ *                     offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+ *                 }
+ *                 // seek to new offsets only for partitions that failed the last process()
+ *                 consumer.seek(offsetsToRewindTo);
+ *             } catch (Exception e) {
+ *                 break;
+ *             } // rewind failed
+ *         }
  *     }
- * }         
- * consumer.close();
+ *     consumer.close();
  * }
  * 
*

- * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's - * group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in - * Kafka. If group management is used, the right place to systematically rewind offsets for every consumer instance is inside the - * ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance - * and before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It - * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you - * always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used. - * This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit. - * And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case, - * you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance - * in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for - * the partitions they own, effectively rewinding the offsets for the entire consumer group. + * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has + * chosen to use Kafka's group management functionality for automatic consumer load balancing and failover. This example + * also assumes that the offsets are stored in Kafka. If group management is used, the right place to systematically + * rewind offsets for every consumer instance is inside the ConsumerRebalanceCallback. The onPartitionsAssigned + * callback is invoked after the consumer is assigned a new set of partitions on rebalance and before the + * consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It + * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group + * management, that you always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects + * whether or not the offset rewind logic is used. This method of rewinding offsets is useful if you notice an issue + * with your message processing after successful consumption and offset commit. And you would like to rewind the offsets + * for the entire consumer group as part of rolling out a fix to your processing logic. In this case, you would + * configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each + * consumer instance in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer + * instances would have rewound the offsets for the partitions they own, effectively rewinding the offsets for the + * entire consumer group. + * *

  * {@code  
  * Properties props = new Properties();
@@ -195,139 +231,146 @@ import org.slf4j.LoggerFactory;
  * consumer.close();
  * }
  * 
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage. - * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to - * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback - * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance and - * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer. + * + * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with + * custom offset storage. In this example, the assumption made is that the user chooses to store the consumer offsets + * outside Kafka. This requires the user to plugin logic for retrieving the offsets from a custom store and provide the + * offsets to the consumer in the ConsumerRebalanceCallback callback. The onPartitionsAssigned callback is invoked after + * the consumer is assigned a new set of partitions on rebalance and before the consumption restarts post + * rebalance. This is the right place to supply offsets from a custom store to the consumer. *

- * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked - * callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place - * to commit the offsets for the current set of partitions owned by the consumer. + * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The + * onPartitionsRevoked callback is invoked right after the consumer has stopped fetching data and before the partition + * ownership changes. This is the right place to commit the offsets for the current set of partitions owned by the + * consumer. + * *

- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
- * KafkaConsumer consumer = new KafkaConsumer(props,
- *                                            new ConsumerRebalanceCallback() {
- *                                                public void onPartitionsAssigned(Consumer consumer, Collection partitions) {
- *                                                    Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
- *                                                    consumer.seek(lastCommittedOffsets);
- *                                                }
- *                                                public void onPartitionsRevoked(Consumer consumer, Collection partitions) {
- *                                                    Map offsets = getLastConsumedOffsets(partitions);
- *                                                    commitOffsetsToCustomStore(offsets); 
- *                                                }
- *                                                // following APIs should be implemented by the user for custom offset management
- *                                                private Map getLastCommittedOffsetsFromCustomStore(Collection partitions) {
- *                                                    return null;
- *                                                }
- *                                                private Map getLastConsumedOffsets(Collection partitions) { return null; }
- *                                                private void commitOffsetsToCustomStore(Map offsets) {}
- *                                            });
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100);
- *     Map lastConsumedOffsets = process(records);
- *     consumedOffsets.putAll(lastConsumedOffsets);
- *     numRecords += records.size();
- *     // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- *     if(numRecords % commitInterval == 0) 
- *         commitOffsetsToCustomStore(consumedOffsets);
- * }
- * consumer.commit(true);
- * consumer.close();
+ * {
+ *     @code
+ *     Properties props = new Properties();
+ *     props.put("metadata.broker.list", "localhost:9092");
+ *     props.put("group.id", "test");
+ *     props.put("session.timeout.ms", "1000");
+ *     props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
+ *     KafkaConsumer consumer = new KafkaConsumer(props, new ConsumerRebalanceCallback() {
+ *         public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+ *             Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
+ *             consumer.seek(lastCommittedOffsets);
+ *         }
+ * 
+ *         public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+ *             Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions);
+ *             commitOffsetsToCustomStore(offsets);
+ *         }
+ *     });
+ *     consumer.subscribe("foo", "bar");
+ *     int commitInterval = 100;
+ *     int numRecords = 0;
+ *     boolean isRunning = true;
+ *     Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ *     while (isRunning) {
+ *         Map<String, ConsumerRecords> records = consumer.poll(100);
+ *         Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *         consumedOffsets.putAll(lastConsumedOffsets);
+ *         numRecords += records.size();
+ *         // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ *         if (numRecords % commitInterval == 0)
+ *             commitOffsetsToCustomStore(consumedOffsets);
+ *     }
+ *     consumer.commit(true);
+ *     consumer.close();
  * }
  * 
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest - * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes - * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. - * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka - * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group + * + * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and + * consume upto the latest available message for each of those partitions before shutting down. When used to subscribe + * to specific partitions, the user foregoes the group management functionality and instead relies on manually + * configuring the consumer instances to subscribe to a set of partitions. This example assumes that the user chooses to + * use Kafka based offset storage. The user still has to specify a group.id to use Kafka based offset management. + * However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group * management is used. + * *
- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("enable.auto.commit", "true");
- * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * // subscribe to some partitions of topic foo
- * TopicPartition partition0 = new TopicPartition("foo", 0);
- * TopicPartition partition1 = new TopicPartition("foo", 1);
- * TopicPartition[] partitions = new TopicPartition[2];
- * partitions[0] = partition0;
- * partitions[1] = partition1;
- * consumer.subscribe(partitions);
- * // find the last committed offsets for partitions 0,1 of topic foo
- * Map lastCommittedOffsets = consumer.committed(Arrays.asList(partitions));
- * // seek to the last committed offsets to avoid duplicates
- * consumer.seek(lastCommittedOffsets);        
- * // find the offsets of the latest available messages to know where to stop consumption
- * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100);
- *     Map lastConsumedOffsets = process(records);
- *     consumedOffsets.putAll(lastConsumedOffsets);
- *     for(TopicPartition partition : partitions) {
- *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
- *             isRunning = false;
- *         else
- *             isRunning = true;
+ * {
+ *     @code
+ *     Properties props = new Properties();
+ *     props.put("metadata.broker.list", "localhost:9092");
+ *     props.put("group.id", "test");
+ *     props.put("enable.auto.commit", "true");
+ *     props.put("auto.commit.interval.ms", "10000");
+ *     KafkaConsumer consumer = new KafkaConsumer(props);
+ *     // subscribe to some partitions of topic foo
+ *     TopicPartition partition0 = new TopicPartition("foo", 0);
+ *     TopicPartition partition1 = new TopicPartition("foo", 1);
+ *     TopicPartition[] partitions = new TopicPartition[2];
+ *     partitions[0] = partition0;
+ *     partitions[1] = partition1;
+ *     consumer.subscribe(partitions);
+ *     // find the last committed offsets for partitions 0,1 of topic foo
+ *     Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(Arrays.asList(partitions));
+ *     // seek to the last committed offsets to avoid duplicates
+ *     consumer.seek(lastCommittedOffsets);
+ *     // find the offsets of the latest available messages to know where to stop consumption
+ *     Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
+ *     boolean isRunning = true;
+ *     Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ *     while (isRunning) {
+ *         Map<String, ConsumerRecords> records = consumer.poll(100);
+ *         Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *         consumedOffsets.putAll(lastConsumedOffsets);
+ *         for (TopicPartition partition : partitions) {
+ *             if (consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ *                 isRunning = false;
+ *             else
+ *                 isRunning = true;
+ *         }
  *     }
- * }
- * consumer.commit(true);
- * consumer.close();
+ *     consumer.commit(true);
+ *     consumer.close();
  * }
  * 
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest - * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes - * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions. - * This example assumes that the user chooses to use custom offset storage. + * + * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and + * consume upto the latest available message for each of those partitions before shutting down. When used to subscribe + * to specific partitions, the user foregoes the group management functionality and instead relies on manually + * configuring the consumer instances to subscribe to a set of partitions. This example assumes that the user chooses to + * use custom offset storage. + * *
- * {@code  
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * KafkaConsumer consumer = new KafkaConsumer(props);
- * // subscribe to some partitions of topic foo
- * TopicPartition partition0 = new TopicPartition("foo", 0);
- * TopicPartition partition1 = new TopicPartition("foo", 1);
- * TopicPartition[] partitions = new TopicPartition[2];
- * partitions[0] = partition0;
- * partitions[1] = partition1;
- * consumer.subscribe(partitions);
- * Map lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
- * // seek to the last committed offsets to avoid duplicates
- * consumer.seek(lastCommittedOffsets);        
- * // find the offsets of the latest available messages to know where to stop consumption
- * Map latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
- * boolean isRunning = true;
- * Map consumedOffsets = new HashMap();
- * while(isRunning) {
- *     Map records = consumer.poll(100);
- *     Map lastConsumedOffsets = process(records);
- *     consumedOffsets.putAll(lastConsumedOffsets);
- *     // commit offsets for partitions 0,1 for topic foo to custom store
+ * {
+ *     @code
+ *     Properties props = new Properties();
+ *     props.put("metadata.broker.list", "localhost:9092");
+ *     KafkaConsumer consumer = new KafkaConsumer(props);
+ *     // subscribe to some partitions of topic foo
+ *     TopicPartition partition0 = new TopicPartition("foo", 0);
+ *     TopicPartition partition1 = new TopicPartition("foo", 1);
+ *     TopicPartition[] partitions = new TopicPartition[2];
+ *     partitions[0] = partition0;
+ *     partitions[1] = partition1;
+ *     consumer.subscribe(partitions);
+ *     Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
+ *     // seek to the last committed offsets to avoid duplicates
+ *     consumer.seek(lastCommittedOffsets);
+ *     // find the offsets of the latest available messages to know where to stop consumption
+ *     Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
+ *     boolean isRunning = true;
+ *     Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ *     while (isRunning) {
+ *         Map<String, ConsumerRecords> records = consumer.poll(100);
+ *         Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ *         consumedOffsets.putAll(lastConsumedOffsets);
+ *         // commit offsets for partitions 0,1 for topic foo to custom store
+ *         commitOffsetsToCustomStore(consumedOffsets);
+ *         for (TopicPartition partition : partitions) {
+ *             if (consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ *                 isRunning = false;
+ *             else
+ *                 isRunning = true;
+ *         }
+ *     }
  *     commitOffsetsToCustomStore(consumedOffsets);
- *     for(TopicPartition partition : partitions) {
- *         if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
- *             isRunning = false;
- *         else
- *             isRunning = true;
- *     }            
- * }      
- * commitOffsetsToCustomStore(consumedOffsets);   
- * consumer.close();
+ *     consumer.close();
  * }
  * 
*/ @@ -336,11 +379,20 @@ public class KafkaConsumer implements Consumer { private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); private final long metadataFetchTimeoutMs; - private final long totalMemorySize; + private final Time time; private final Metrics metrics; - private final Set subscribedTopics; - private final Set subscribedPartitions; - + private final Subscriptions subscriptions; + private final Metadata metadata; + private final Heartbeat heartbeat; + private final NetworkClient client; + private final List records; + private final int maxWaitMs; + private final int minBytes; + private final int fetchSize; + private final String group; + private Node consumerCoordinator; + private boolean closed = false; + /** * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. Values can be @@ -348,41 +400,41 @@ public class KafkaConsumer implements Consumer { * string "42" or the integer 42). *

* Valid configuration strings are documented at {@link ConsumerConfig} - * @param configs The consumer configs + * @param configs The consumer configs */ public KafkaConsumer(Map configs) { this(new ConsumerConfig(configs), null); } /** - * A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback} - * implementation + * A consumer is instantiated by providing a set of key-value pairs as configuration and a + * {@link ConsumerRebalanceCallback} implementation *

* Valid configuration strings are documented at {@link ConsumerConfig} - * @param configs The consumer configs - * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of - * every rebalance operation. + * @param configs The consumer configs + * @param callback A callback interface that the user can implement to manage customized offsets on the start and + * end of every rebalance operation. */ public KafkaConsumer(Map configs, ConsumerRebalanceCallback callback) { this(new ConsumerConfig(configs), callback); } /** - * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. - * Valid configuration strings are documented at {@link ConsumerConfig} + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. Valid + * configuration strings are documented at {@link ConsumerConfig} */ public KafkaConsumer(Properties properties) { this(new ConsumerConfig(properties), null); } /** - * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a - * {@link ConsumerRebalanceCallback} implementation. + * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a + * {@link ConsumerRebalanceCallback} implementation. *

* Valid configuration strings are documented at {@link ConsumerConfig} * @param properties The consumer configuration properties - * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of - * every rebalance operation. + * @param callback A callback interface that the user can implement to manage customized offsets on the start and + * end of every rebalance operation. */ public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) { this(new ConsumerConfig(properties), callback); @@ -394,175 +446,320 @@ public class KafkaConsumer implements Consumer { private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) { log.trace("Starting the Kafka consumer"); - subscribedTopics = new HashSet(); - subscribedPartitions = new HashSet(); - this.metrics = new Metrics(new MetricConfig(), - Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")), - new SystemTime()); + this.time = new SystemTime(); + this.maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); + this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG); + this.fetchSize = config.getInt(ConsumerConfig.FETCH_BUFFER_CONFIG); this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG); - List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.group = config.getString(ConsumerConfig.GROUP_ID_CONFIG); + this.records = new ArrayList(); + this.heartbeat = new Heartbeat(60 * 1000, time.milliseconds()); // TODO: make timeout configurable + + Time time = new SystemTime(); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); + String jmxPrefix = "kafka.consumer." + (clientId.length() > 0 ? clientId + "." : ""); + List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(jmxPrefix)); + this.metrics = new Metrics(metricConfig, reporters, time); + long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.metadata.update(Cluster.bootstrap(addresses), 0); + + int maxInflightRequests = config.getInt(ConsumerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); + long reconnectBackoffMs = config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG); + int sendBuffer = config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG); + int receiveBuffer = config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG); + this.client = new NetworkClient(new Selector(this.metrics, time), + this.metadata, + clientId, + maxInflightRequests, + reconnectBackoffMs, + sendBuffer, + receiveBuffer); + this.subscriptions = new Subscriptions(); + config.logUnused(); + + consumerCoordinator = null; log.debug("Kafka consumer started"); } /** * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality *

- * As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and - * will trigger a rebalance operation if one of the following events trigger - + * As part of group management, the consumer will keep track of the list of consumers that belong to a particular + * group and will trigger a rebalance operation if one of the following events trigger - *

    - *
  • Number of partitions change for any of the subscribed list of topics - *
  • Topic is created or deleted - *
  • An existing member of the consumer group dies - *
  • A new member is added to an existing consumer group via the join API - *
+ *
  • Number of partitions change for any of the subscribed list of topics + *
  • Topic is created or deleted + *
  • An existing member of the consumer group dies + *
  • A new member is added to an existing consumer group via the join API + * * @param topics A variable list of topics that the consumer wants to subscribe to */ @Override public void subscribe(String... topics) { - if(subscribedPartitions.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(String topic:topics) - subscribedTopics.add(topic); + ensureNotClosed(); + for (String topic : topics) + this.subscriptions.subscribe(topic); + metadata.addTopics(topics); // TODO: trigger a rebalance operation } /** - * Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such, - * there will be no rebalance operation triggered when group membership or cluster and topic metadata change. + * Incrementally subscribes to a specific topic partition and does not use the consumer's group management + * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic + * metadata change. *

    * @param partitions Partitions to incrementally subscribe to */ @Override public void subscribe(TopicPartition... partitions) { - if(subscribedTopics.size() > 0) - throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(TopicPartition partition:partitions) - subscribedPartitions.add(partition); + ensureNotClosed(); + for (TopicPartition tp : partitions) { + this.subscriptions.subscribe(tp); + metadata.addTopics(tp.topic()); + } } /** - * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned - * from the next {@link #poll(long) poll()} onwards + * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will + * not be returned from the next {@link #poll(long) poll()} onwards * @param topics Topics to unsubscribe from */ - public void unsubscribe(String... topics) { + public void unsubscribe(String... topics) { + ensureNotClosed(); // throw an exception if the topic was never subscribed to - for(String topic:topics) { - if(!subscribedTopics.contains(topic)) - throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + - " to unsubscribe(" + topic + ")"); - subscribedTopics.remove(topic); - } + for (String topic : topics) + this.subscriptions.unsubscribe(topic); // TODO trigger a rebalance operation } /** - * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next + * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next * {@link #poll(long) poll()} onwards * @param partitions Partitions to unsubscribe from */ - public void unsubscribe(TopicPartition... partitions) { + public void unsubscribe(TopicPartition... partitions) { + ensureNotClosed(); // throw an exception if the partition was never subscribed to - for(TopicPartition partition:partitions) { - if(!subscribedPartitions.contains(partition)) - throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + - partition.topic() + "," + partition.partition() + ") should be called prior" + - " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); - subscribedPartitions.remove(partition); - } - // trigger a rebalance operation + for (TopicPartition partition : partitions) + this.subscriptions.unsubscribe(partition); } - + /** - * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to - * any topics or partitions before polling for data. - *

    - * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)} - * is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and - * on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset - * using {@link #commit(Map, boolean) commit(offsets, sync)} - * for the subscribed list of partitions. - * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative + * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have + * subscribed to any topics or partitions before polling for data. + *

    + * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)} is used. If + * {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and on every rebalance, to + * consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset using + * {@link #commit(Map, boolean) commit(offsets, sync)} for the subscribed list of partitions. + * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits + * indefinitely. Must not be negative * @return map of topic to records since the last fetch for the subscribed list of topics and partitions */ @Override - public Map poll(long timeout) { - // TODO Auto-generated method stub - return null; + public List poll(long timeout) { + ensureNotClosed(); + long now = System.currentTimeMillis(); + + if (subscriptions.partitionsAutoAssigned()) { + // get partition assignment if needed + if (subscriptions.needsPartitionAssignment()) { + // do something with co-ordinator to get partition + } + // heartbeat if needed + if (heartbeat.shouldHeartbeat(now)) + sendHeartbeat(now); + } + + // fetch positions if we have partitions we don't know the offset for + if (!subscriptions.hasAllPositions()) + fetchPositions(now); + + /* initiate any needed fetches, then block for the timeout the user specified */ + Cluster cluster = this.metadata.fetch(); + reinstateFetches(cluster, now); + processResponses(client.poll(timeout, now), now); + + /* initiate a fetch request for any nodes that we just got a response from without blocking */ + reinstateFetches(cluster, now); + processResponses(client.poll(0, now), now); + + return this.records; + } + + private void reinstateFetches(Cluster cluster, long now) { + for (ClientRequest request : createFetchRequests(cluster)) { + Node node = cluster.nodeById(request.request().destination()); + if (client.ready(node, now)) + client.send(request, now); + } + } + + private void sendHeartbeat(long now) { + + } + + private List createFetchRequests(Cluster cluster) { + Map fetchable = new HashMap(); + for (TopicPartition partition : subscriptions.assignedPartitions()) { + Node node = cluster.leaderFor(partition); + // if there is a leader and no in-flight requests, issue a new fetch + if (node != null && this.client.inFlightRequestCount(node.id()) == 0) { + FetchRequest fetch = fetchable.get(partition.topic()); + if (fetch == null) { + fetch = new FetchRequest(this.maxWaitMs, this.minBytes); + fetchable.put(node.id(), fetch); + } + long offset = this.subscriptions.position(partition); + fetch.addPartition(partition.topic(), partition.partition(), offset, this.fetchSize); + } + } + List requests = new ArrayList(fetchable.size()); + for (Map.Entry entry : fetchable.entrySet()) { + RequestSend send = new RequestSend(entry.getKey(), this.client.nextRequestHeader(ApiKeys.FETCH), entry.getValue().toStruct()); + ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null); + requests.add(request); + } + return requests; + } + + private void processResponses(List responses, long now) { + // process results + for (ClientResponse response : responses) { + if (response.wasDisconnected()) + handleDisconnect(response, now); + else + handleResponse(response, now); + } + } + + /* send a request to the consumer co-ordinator */ + private void blockingCoordinatorRequest(ClientRequest request, long now) { + while (true) { + if (this.consumerCoordinator == null) + updateCoordinator(now); + this.client.send(request, now); + processResponses(this.client.completeAll(consumerCoordinator.id(), now), now); + } + } + + /* update the current consumer co-ordinator */ + private void updateCoordinator(long now) { + while (this.consumerCoordinator == null) { + // find a node to ask about the co-ordinator + Node node = this.client.leastLoadedNode(now); + while (node == null) { + processResponses(this.client.poll(Integer.MAX_VALUE, now), now); + node = this.client.leastLoadedNode(now); + } + + // send the metadata request and process all responses + this.client.send(createConsumerMetadataRequest(now), now); + processResponses(this.client.completeAll(node.id(), now), now); + + // backoff if we still haven't found a co-ordinator + if (this.consumerCoordinator == null) + Utils.sleep(100); + } + } + + private void fetchPositions(long now) { + // send an OffsetFetch request to find the position + throw new IllegalStateException("No known position and OffsetFetch not implemented yet."); } /** * Commits the specified offsets for the specified list of topics and partitions to Kafka. *

    - * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance - * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. - * @param offsets The list of offsets per partition that should be committed to Kafka. - * @param sync If true, commit will block until the consumer receives an acknowledgment - * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null - * if the sync flag is set to false. + * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after + * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + * @param offsets The list of offsets per partition that should be committed to Kafka. + * @param sync If true, commit will block until the consumer receives an acknowledgment + * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. + * Returns null if the sync flag is set to false. */ @Override public OffsetMetadata commit(Map offsets, boolean sync) { + ensureNotClosed(); throw new UnsupportedOperationException(); } /** - * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and - * partitions. + * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. *

    - * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance - * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. - * @param sync If true, commit will block until the consumer receives an acknowledgment - * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null - * if the sync flag is set to false. + * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after + * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. + * @param sync If true, commit will block until the consumer receives an acknowledgment + * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. + * Returns null if the sync flag is set to false. */ @Override public OffsetMetadata commit(boolean sync) { + ensureNotClosed(); throw new UnsupportedOperationException(); } /** - * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API is invoked - * for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is - * arbitrarily used in the middle of consumption, to reset the fetch offsets + * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API + * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that + * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets */ @Override - public void seek(Map offsets) { + public void seek(TopicPartition partition, long offset) { + ensureNotClosed(); + this.subscriptions.seek(partition, offset); } /** - * Returns the fetch position of the next message for the specified topic partition to be used on the next {@link #poll(long) poll()} + * Returns the fetch position of the next message for the specified topic partition to be used on the next + * {@link #poll(long) poll()} * @param partitions Partitions for which the fetch position will be returned - * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()} + * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) + * poll()} */ public Map position(Collection partitions) { - return null; + ensureNotClosed(); + Map ps = new HashMap(partitions.size()); + for (TopicPartition p : partitions) + ps.put(p, this.subscriptions.position(p)); + return ps; } /** - * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset - * storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data. + * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant + * if Kafka based offset storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} + * to rewind consumption of data. * @param partitions The list of partitions to return the last committed offset for - * @return The list of offsets committed on the last {@link #commit(boolean) commit(sync)} + * @return The list of offsets committed on the last {@link #commit(boolean) commit(sync)} */ @Override public Map committed(Collection partitions) { - // TODO Auto-generated method stub + ensureNotClosed(); throw new UnsupportedOperationException(); } /** - * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact - * message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages - * returned by the consumer. + * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not + * correspond to the exact message at the given timestamp. As such, if the consumer is rewound to offsets returned + * by this API, there may be duplicate messages returned by the consumer. * @param partitions The list of partitions for which the offsets are returned - * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp. + * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest + * available timestamp. * @return The offsets per partition before the specified timestamp. */ public Map offsetsBeforeTime(long timestamp, Collection partitions) { - return null; + ensureNotClosed(); + throw new UnsupportedOperationException(); } @Override @@ -570,12 +767,227 @@ public class KafkaConsumer implements Consumer { return Collections.unmodifiableMap(this.metrics.metrics()); } + public List partitionsFor(String topic) { + return this.metadata.fetch().partitionsForTopic(topic); + } + @Override public void close() { log.trace("Closing the Kafka consumer."); - subscribedTopics.clear(); - subscribedPartitions.clear(); + this.closed = true; this.metrics.close(); + this.client.close(); log.debug("The Kafka consumer has closed."); } + + /** + * Create a consumer metadata request for the given group + */ + private ClientRequest createConsumerMetadataRequest(long now) { + ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.group); + Node destination = this.client.leastLoadedNode(now); + if (destination == null) // all nodes are blacked out + return null; + RequestSend send = new RequestSend(destination.id(), this.client.nextRequestHeader(ApiKeys.CONSUMER_METADATA), request.toStruct()); + ClientRequest consumerMetadataRequest = new ClientRequest(now, true, send, null); + return consumerMetadataRequest; + } + + /** + * Handle disconnections + * @param response The response + * @param now The current time + */ + private void handleDisconnect(ClientResponse response, long now) { + int correlation = response.request().request().header().correlationId(); + log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected", + response.request(), + correlation, + response.request().request().destination()); + // TODO: Implement me + throw new IllegalStateException("Do something about disconnects."); + } + + private void handleResponse(ClientResponse response, long now) { + short api = response.request().request().header().apiKey(); + if (api == ApiKeys.FETCH.id) { + handleFetchResponse(new FetchResponse(response.responseBody())); + } else if (api == ApiKeys.CONSUMER_METADATA.id) { + // TODO: Implement me + throw new IllegalStateException("Implement me."); + } else if (api == ApiKeys.OFFSET_COMMIT.id) { + // TODO: Implement me + throw new IllegalStateException("Implement me."); + } else if (api == ApiKeys.OFFSET_FETCH.id) { + // TODO: Implement me + throw new IllegalStateException("Implement me."); + } else { + throw new IllegalStateException("Unexpected response key: " + response); + } + } + + private void handleFetchResponse(FetchResponse response) { + for (FetchResponse.PartitionFetch partition : response.partitions()) { + if (partition.error() == Errors.NONE.code()) { + long offset = -1; + ByteBuffer buffer = partition.data(); + buffer.position(buffer.limit()); // TODO: this is clearly not right + MemoryRecords records = MemoryRecords.readableRecords(buffer); + for (LogEntry entry : records) { + offset = entry.offset(); + this.records.add(new ConsumerRecord(partition.topic(), + partition.partition(), + Utils.toArray(entry.record().key()), + Utils.toArray(entry.record().value()), + offset)); + } + if (offset >= 0) + this.subscriptions.seek(new TopicPartition(partition.topic(), partition.partition()), offset + 1); + } else { + // TODO: Implement me + throw new IllegalArgumentException("Handle partition error"); + } + } + } + + /** + * Check that the consumer hasn't been closed. + */ + private void ensureNotClosed() { + if (this.closed) + throw new IllegalStateException("This consumer has already been closed."); + } + + /** + * A helper class for managing the heartbeat to the co-ordinator + */ + private static class Heartbeat { + + private long timeout; + private long lastHeartbeat; + + public Heartbeat(long timeout, long now) { + this.lastHeartbeat = now; + } + + public void heartbeat(long now) { + this.lastHeartbeat = now; + } + + public boolean isAlive(long now) { + return now - lastHeartbeat > timeout; + } + + public boolean shouldHeartbeat(long now) { + return now - lastHeartbeat > 0.3 * this.timeout; // TODO: may want to discuss criteria + } + } + + /** + * A class for tracking the topics, partitions, and offsets for the consumer + */ + private static class Subscriptions { + + /* the list of topics the user has requested */ + private final Set subscribedTopics; + + /* the list of partitions the user has requested */ + private final Set subscribedPartitions; + + /* the list of partitions currently assigned */ + private final Set assignedPartitions; + + /* the offset for each partition */ + private final Map positions; + + /* do we need to request a partition assignment from the co-ordinator? */ + private boolean needsPartitionAssignment; + + public Subscriptions() { + this.subscribedTopics = new HashSet(); + this.subscribedPartitions = new HashSet(); + this.assignedPartitions = new HashSet(); + this.positions = new HashMap(); + this.needsPartitionAssignment = false; + } + + public void subscribe(String topic) { + if (subscribedPartitions.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); + if (!this.subscribedTopics.contains(topic)) { + this.subscribedTopics.add(topic); + this.needsPartitionAssignment = true; + } + } + + public void unsubscribe(String topic) { + if (!subscribedTopics.contains(topic)) + throw new IllegalStateException("Topic " + topic + + " was never subscribed to. subscribe(" + + topic + + ") should be called prior" + + " to unsubscribe(" + + topic + + ")"); + subscribedTopics.remove(topic); + } + + public void subscribe(TopicPartition tp) { + if (subscribedTopics.size() > 0) + throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); + + this.subscribedPartitions.add(tp); + this.assignedPartitions.add(tp); + } + + public void unsubscribe(TopicPartition partition) { + if (!subscribedPartitions.contains(partition)) + throw new IllegalStateException("Partition " + partition + + " was never subscribed to. subscribe(new TopicPartition(" + + partition.topic() + + "," + + partition.partition() + + ") should be called prior to unsubscribe(new TopicPartition(" + + partition.topic() + + "," + + partition.partition() + + ")"); + subscribedPartitions.remove(partition); + assignedPartitions.remove(partition); + } + + public void seek(TopicPartition tp, long offset) { + if (!this.assignedPartitions.contains(tp)) + throw new IllegalStateException("Can't change the position for a partition you are not currently subscribed to."); + this.positions.put(tp, offset); + } + + public Set assignedPartitions() { + return this.assignedPartitions; + } + + public boolean partitionsAutoAssigned() { + return !this.subscribedTopics.isEmpty(); + } + + public boolean needsPartitionAssignment() { + return this.needsPartitionAssignment; + } + + public long position(TopicPartition partition) { + return this.positions.get(partition); + } + + public boolean hasAllPositions() { + return this.positions.size() >= this.assignedPartitions.size(); + } + + public Set missingPositions() { + Set copy = new HashSet(this.assignedPartitions); + for (TopicPartition p : this.positions.keySet()) + copy.remove(p); + return copy; + } + + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index c3aad3b..7c440af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -9,7 +9,7 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. -*/ + */ package org.apache.kafka.clients.consumer; import java.io.ByteArrayOutputStream; @@ -23,44 +23,45 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; + import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; /** - * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. - * This class is not threadsafe + * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is not + * threadsafe *

    - * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it - * needs to communicate with. Failure to close the consumer after use will leak these resources. + * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it needs to + * communicate with. Failure to close the consumer after use will leak these resources. */ public class MockConsumer implements Consumer { private final Set subscribedPartitions; private final Set subscribedTopics; - private final Map committedOffsets; + private final Map committedOffsets; private final Map consumedOffsets; - + public MockConsumer() { subscribedPartitions = new HashSet(); subscribedTopics = new HashSet(); committedOffsets = new HashMap(); consumedOffsets = new HashMap(); } - + @Override public void subscribe(String... topics) { - if(subscribedPartitions.size() > 0) + if (subscribedPartitions.size() > 0) throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(String topic : topics) { + for (String topic : topics) { subscribedTopics.add(topic); } } @Override public void subscribe(TopicPartition... partitions) { - if(subscribedTopics.size() > 0) + if (subscribedTopics.size() > 0) throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive"); - for(TopicPartition partition : partitions) { + for (TopicPartition partition : partitions) { subscribedPartitions.add(partition); consumedOffsets.put(partition, 0L); } @@ -68,33 +69,46 @@ public class MockConsumer implements Consumer { public void unsubscribe(String... topics) { // throw an exception if the topic was never subscribed to - for(String topic:topics) { - if(!subscribedTopics.contains(topic)) - throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + - " to unsubscribe(" + topic + ")"); + for (String topic : topics) { + if (!subscribedTopics.contains(topic)) + throw new IllegalStateException("Topic " + topic + + " was never subscribed to. subscribe(" + + topic + + ") should be called prior" + + " to unsubscribe(" + + topic + + ")"); subscribedTopics.remove(topic); } } public void unsubscribe(TopicPartition... partitions) { // throw an exception if the partition was never subscribed to - for(TopicPartition partition:partitions) { - if(!subscribedPartitions.contains(partition)) - throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + - partition.topic() + "," + partition.partition() + ") should be called prior" + - " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); - subscribedPartitions.remove(partition); + for (TopicPartition partition : partitions) { + if (!subscribedPartitions.contains(partition)) + throw new IllegalStateException("Partition " + partition + + " was never subscribed to. subscribe(new TopicPartition(" + + partition.topic() + + "," + + partition.partition() + + ") should be called prior" + + " to unsubscribe(new TopicPartition(" + + partition.topic() + + "," + + partition.partition() + + ")"); + subscribedPartitions.remove(partition); committedOffsets.remove(partition); consumedOffsets.remove(partition); } } @Override - public Map poll(long timeout) { + public List poll(long timeout) { // hand out one dummy record, 1 per topic Map> records = new HashMap>(); Map recordMetadata = new HashMap(); - for(TopicPartition partition : subscribedPartitions) { + for (TopicPartition partition : subscribedPartitions) { // get the last consumed offset long messageSequence = consumedOffsets.get(partition); ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); @@ -107,18 +121,22 @@ public class MockConsumer implements Consumer { e.printStackTrace(); } List recordsForTopic = records.get(partition.topic()); - if(recordsForTopic == null) { + if (recordsForTopic == null) { recordsForTopic = new ArrayList(); records.put(partition.topic(), recordsForTopic); } - recordsForTopic.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence)); + recordsForTopic.add(new ConsumerRecord(partition.topic(), + partition.partition(), + null, + byteStream.toByteArray(), + messageSequence)); consumedOffsets.put(partition, messageSequence); } - for(Entry> recordsPerTopic : records.entrySet()) { + for (Entry> recordsPerTopic : records.entrySet()) { Map> recordsPerPartition = new HashMap>(); - for(ConsumerRecord record : recordsPerTopic.getValue()) { + for (ConsumerRecord record : recordsPerTopic.getValue()) { List recordsForThisPartition = recordsPerPartition.get(record.partition()); - if(recordsForThisPartition == null) { + if (recordsForThisPartition == null) { recordsForThisPartition = new ArrayList(); recordsPerPartition.put(record.partition(), recordsForThisPartition); } @@ -126,38 +144,36 @@ public class MockConsumer implements Consumer { } recordMetadata.put(recordsPerTopic.getKey(), new ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition)); } - return recordMetadata; + return null; // FIXME: recordMetadata; } @Override public OffsetMetadata commit(Map offsets, boolean sync) { - if(!sync) + if (!sync) return null; - for(Entry partitionOffset : offsets.entrySet()) { - committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); - } + for (Entry partitionOffset : offsets.entrySet()) { + committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); + } return new OffsetMetadata(committedOffsets, null); } @Override public OffsetMetadata commit(boolean sync) { - if(!sync) + if (!sync) return null; return commit(consumedOffsets, sync); } @Override - public void seek(Map offsets) { + public void seek(TopicPartition partition, long offset) { // change the fetch offsets - for(Entry partitionOffset : offsets.entrySet()) { - consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue()); - } + consumedOffsets.put(partition, offset); } @Override public Map committed(Collection partitions) { Map offsets = new HashMap(); - for(TopicPartition partition : partitions) { + for (TopicPartition partition : partitions) { offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition)); } return offsets; @@ -166,26 +182,25 @@ public class MockConsumer implements Consumer { @Override public Map position(Collection partitions) { Map positions = new HashMap(); - for(TopicPartition partition : partitions) { + for (TopicPartition partition : partitions) { positions.put(partition, consumedOffsets.get(partition)); } return positions; } @Override - public Map offsetsBeforeTime(long timestamp, - Collection partitions) { + public Map offsetsBeforeTime(long timestamp, Collection partitions) { throw new UnsupportedOperationException(); } @Override - public Map metrics() { + public Map metrics() { return null; } @Override public void close() { - // unsubscribe from all partitions + // unsubscribe from all partitions TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()]; unsubscribe(subscribedPartitions.toArray(allPartitions)); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 4aa5b01..de38a4d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -78,9 +78,9 @@ public final class Metadata { } /** - * The next time to update the cluster info is the maximum of the time the current info will expire - * and the time the current info can be updated (i.e. backoff time has elapsed); If an update has - * been request then the expiry time is now + * The next time to update the cluster info is the maximum of the time the current info will expire and the time the + * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time + * is now */ public synchronized long timeToNextUpdate(long nowMs) { long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); @@ -114,6 +114,12 @@ public final class Metadata { } } + public synchronized void addTopics(String... topics) { + for (String topic : topics) + this.topics.add(topic); + requestUpdate(); + } + /** * Get the list of topics we are currently maintaining metadata for */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index a016269..b7cc9b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -166,12 +166,14 @@ public class Sender implements Runnable { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); } + for (ClientRequest request : requests) + client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; - List responses = this.client.poll(requests, result.nextReadyCheckDelayMs, now); + List responses = this.client.poll(result.nextReadyCheckDelayMs, now); for (ClientResponse response : responses) { if (response.wasDisconnected()) handleDisconnect(response, now); diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java index c62707a..68177d8 100644 --- a/clients/src/main/java/org/apache/kafka/common/Cluster.java +++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java @@ -31,6 +31,7 @@ public final class Cluster { private final Map partitionsByTopicPartition; private final Map> partitionsByTopic; private final Map> partitionsByNode; + private final Map nodesById; /** * Create a new cluster with the given nodes and partitions @@ -42,6 +43,10 @@ public final class Cluster { List copy = new ArrayList(nodes); Collections.shuffle(copy); this.nodes = Collections.unmodifiableList(copy); + + this.nodesById = new HashMap(); + for(Node node: nodes) + this.nodesById.put(node.id(), node); // index the partitions by topic/partition for quick lookup this.partitionsByTopicPartition = new HashMap(partitions.size()); @@ -102,6 +107,15 @@ public final class Cluster { public List nodes() { return this.nodes; } + + /** + * Get the node by the node id (or null if no such node exists) + * @param id The id of the node + * @return The node, or null if no such node exists + */ + public Node nodeById(int id) { + return this.nodesById.get(id); + } /** * Get the current leader for the given topic-partition diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index b68bbf0..b5f8d83 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.network; @@ -51,13 +47,17 @@ public interface Selectable { public void close(); /** - * Initiate any sends provided, and make progress on any other I/O operations in-flight (connections, - * disconnections, existing sends, and receives) + * Queue the given request for sending in the subsequent {@poll(long)} calls + * @param send The request to send + */ + public void send(NetworkSend send); + + /** + * Do I/O. Reads, writes, connection establishment, etc. * @param timeout The amount of time to block if there is nothing to do - * @param sends The new sends to initiate * @throws IOException */ - public void poll(long timeout, List sends) throws IOException; + public void poll(long timeout) throws IOException; /** * The list of sends that completed on the last {@link #poll(long, List) poll()} call. @@ -81,4 +81,26 @@ public interface Selectable { */ public List connected(); + /** + * Disable reads from the given connection + * @param id The id for the connection + */ + public void mute(int id); + + /** + * Re-enable reads from the given connection + * @param id The id for the connection + */ + public void unmute(int id); + + /** + * Disable reads from all connections + */ + public void muteAll(); + + /** + * Re-enable reads from all connections + */ + public void unmuteAll(); + } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 93f2f1c..6d6729f 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -78,6 +78,7 @@ public class Selector implements Selectable { private final List completedReceives; private final List disconnected; private final List connected; + private final List failedSends; private final Time time; private final SelectorMetrics sensors; @@ -96,6 +97,7 @@ public class Selector implements Selectable { this.completedReceives = new ArrayList(); this.connected = new ArrayList(); this.disconnected = new ArrayList(); + this.failedSends = new ArrayList(); this.sensors = new SelectorMetrics(metrics); } @@ -172,10 +174,26 @@ public class Selector implements Selectable { } /** + * Queue the given request for sending in the subsequent {@poll(long)} calls + * @param send The request to send + */ + public void send(NetworkSend send) { + SelectionKey key = keyForId(send.destination()); + Transmissions transmissions = transmissions(key); + if (transmissions.hasSend()) + throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); + transmissions.send = send; + try { + key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + } catch (CancelledKeyException e) { + close(key); + this.failedSends.add(send.destination()); + } + } + + /** * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing * disconnections, initiating new sends, or making progress on in-progress sends or receives. - *

    - * The provided network sends will be started. * * When this call is completed the user can check for completed sends, receives, connections or disconnects using * {@link #completedSends()}, {@link #completedReceives()}, {@link #connected()}, {@link #disconnected()}. These @@ -183,29 +201,14 @@ public class Selector implements Selectable { * completed I/O. * * @param timeout The amount of time to wait, in milliseconds. If negative, wait indefinitely. - * @param sends The list of new sends to begin * * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is * already an in-progress send */ @Override - public void poll(long timeout, List sends) throws IOException { + public void poll(long timeout) throws IOException { clear(); - /* register for write interest on any new sends */ - for (NetworkSend send : sends) { - SelectionKey key = keyForId(send.destination()); - Transmissions transmissions = transmissions(key); - if (transmissions.hasSend()) - throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); - transmissions.send = send; - try { - key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); - } catch (CancelledKeyException e) { - close(key); - } - } - /* check ready keys */ long startSelect = time.nanoseconds(); int readyKeys = select(timeout); @@ -259,11 +262,14 @@ public class Selector implements Selectable { } /* cancel any defunct sockets */ - if (!key.isValid()) + if (!key.isValid()) { close(key); + this.disconnected.add(transmissions.id); + } } catch (IOException e) { log.error("Error in I/O: ", e); close(key); + this.disconnected.add(transmissions.id); } } } @@ -291,6 +297,36 @@ public class Selector implements Selectable { return this.connected; } + @Override + public void mute(int id) { + mute(this.keyForId(id)); + } + + private void mute(SelectionKey key) { + key.interestOps(key.interestOps() & ~SelectionKey.OP_READ); + } + + @Override + public void unmute(int id) { + unmute(this.keyForId(id)); + } + + private void unmute(SelectionKey key) { + key.interestOps(key.interestOps() | SelectionKey.OP_READ); + } + + @Override + public void muteAll() { + for (SelectionKey key : this.keys.values()) + mute(key); + } + + @Override + public void unmuteAll() { + for (SelectionKey key : this.keys.values()) + unmute(key); + } + /** * Clear the results from the prior poll */ @@ -299,6 +335,8 @@ public class Selector implements Selectable { this.completedReceives.clear(); this.connected.clear(); this.disconnected.clear(); + this.disconnected.addAll(this.failedSends); + this.failedSends.clear(); } /** @@ -324,7 +362,6 @@ public class Selector implements Selectable { SocketChannel channel = channel(key); Transmissions trans = transmissions(key); if (trans != null) { - this.disconnected.add(trans.id); this.keys.remove(trans.id); trans.clearReceive(); trans.clearSend(); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6fe7573..9ad637a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -17,8 +17,6 @@ package org.apache.kafka.common.protocol; -import java.util.ArrayList; -import java.util.List; /** * Identifiers for all the Kafka APIs @@ -30,8 +28,11 @@ public enum ApiKeys { METADATA(3, "metadata"), LEADER_AND_ISR(4, "leader_and_isr"), STOP_REPLICA(5, "stop_replica"), - OFFSET_COMMIT(6, "offset_commit"), - OFFSET_FETCH(7, "offset_fetch"); + UPDATE_METADATA(6, "update_metadata"), + CONTROLLED_SHUTDOWN(7, "controlled_shutdown"), + OFFSET_COMMIT(8, "offset_commit"), + OFFSET_FETCH(9, "offset_fetch"), + CONSUMER_METADATA(10, "consumer_metadata"); private static ApiKeys[] codeToType; public static int MAX_API_KEY = -1; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 044b030..dba17ed 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.protocol; @@ -101,9 +97,57 @@ public class Protocol { new Field("base_offset", INT64)))))))); + /* Consumer apis */ + public static Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id", + INT32, + "The node id of the consumer if the consumer is a replica"), + new Field("max_wait_ms", + INT32, + "The maximum amount of time for the server to block waiting to meet the minimum byte requirement if it cannot be met immediately."), + new Field("min_bytes", + INT32, + "The minimum number of bytes of data that must be ready, per-partition, for the request to be serviced."), + new Field("topics", + new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition_info", + new ArrayOf(new Schema(new Field("partition", + INT32, + "The partition to fetch data from"), + new Field("fetch_offset", + INT64, + "The offset to fetch from"), + new Field("max_bytes", + INT32, + "The maximum bytes to take from this partition")))))))); + + public static Schema FETCH_RESPONSE_V0 = new Schema(new Field("data", + new ArrayOf(new Schema(new Field("topic", STRING), + new Field("partition_data", + new ArrayOf(new Schema(new Field("partition", + INT32, + "The partition this data is for"), + new Field("error_code", + INT16, + "The error code if an error occurred while processing this request"), + new Field("hwm", + INT64, + "The last committed offset in this partition"), + new Field("message_set", + BYTES, + "The log bytes fetched.")))))))); + + public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group", STRING)); + public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16), new Field("coordinator", BROKER)); + + public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 }; + public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 }; + public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 }; public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 }; + public static Schema[] FETCH_REQUEST = new Schema[] { FETCH_REQUEST_V0 }; + public static Schema[] FETCH_RESPONSE = new Schema[] { FETCH_RESPONSE_V0 }; + /* an array of all requests and responses with all schema versions */ public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][]; @@ -113,22 +157,28 @@ public class Protocol { static { REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST; - REQUESTS[ApiKeys.FETCH.id] = new Schema[] {}; + REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST; REQUESTS[ApiKeys.LIST_OFFSETS.id] = new Schema[] {}; REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + REQUESTS[ApiKeys.UPDATE_METADATA.id] = new Schema[] {}; + REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; - RESPONSES[ApiKeys.FETCH.id] = new Schema[] {}; + RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; RESPONSES[ApiKeys.LIST_OFFSETS.id] = new Schema[] {}; RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE; RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + RESPONSES[ApiKeys.UPDATE_METADATA.id] = new Schema[] {}; + RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) @@ -137,11 +187,11 @@ public class Protocol { /* sanity check that we have the same number of request and response versions for each api */ for (ApiKeys api : ApiKeys.values()) if (REQUESTS[api.id].length != RESPONSES[api.id].length) - throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " - + api.name - + " but " - + RESPONSES[api.id].length - + " response versions."); + throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + + api.name + + " but " + + RESPONSES[api.id].length + + " response versions."); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 8cecba5..bb9e7e6 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.common.protocol.types; @@ -107,6 +103,14 @@ public class Struct { return (Integer) get(name); } + public Long getLong(Field field) { + return (Long) get(field); + } + + public Long getLong(String name) { + return (Long) get(name); + } + public Object[] getArray(Field field) { return (Object[]) get(field); } @@ -123,6 +127,14 @@ public class Struct { return (String) get(name); } + public ByteBuffer getBytes(Field field) { + return (ByteBuffer) get(field); + } + + public ByteBuffer getBytes(String name) { + return (ByteBuffer) get(name); + } + /** * Set the given field to the specified value * @@ -150,9 +162,9 @@ public class Struct { } /** - * Create a struct for the schema of a container type (struct or array). - * Note that for array type, this method assumes that the type is an array of schema and creates a struct - * of that schema. Arrays of other types can't be instantiated with this method. + * Create a struct for the schema of a container type (struct or array). Note that for array type, this method + * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be + * instantiated with this method. * * @param field The field to create an instance of * @return The struct diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 040e5b9..7d878eb 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -55,7 +55,7 @@ public class MemoryRecords implements Records { return emptyRecords(buffer, type, buffer.capacity()); } - public static MemoryRecords iterableRecords(ByteBuffer buffer) { + public static MemoryRecords readableRecords(ByteBuffer buffer) { return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity()); } @@ -94,22 +94,21 @@ public class MemoryRecords implements Records { * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be * accurate if compression is really used. When this happens, the following append may cause dynamic buffer * re-allocation in the underlying byte buffer stream. - * + * * Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be * smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the - * capacity will be the message size, but the size limit will still be the batch size), and when the records' size has - * exceed this limit we also mark this record as full. + * capacity will be the message size, but the size limit will still be the batch size), and when the records' size + * has exceed this limit we also mark this record as full. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.writable && - this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value) && - this.sizeLimit >= this.compressor.estimatedBytesWritten(); + return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + + Record.recordSize(key, value) && + this.sizeLimit >= this.compressor.estimatedBytesWritten(); } public boolean isFull() { - return !this.writable || - this.capacity <= this.compressor.estimatedBytesWritten() || - this.sizeLimit <= this.compressor.estimatedBytesWritten(); + return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() || + this.sizeLimit <= this.compressor.estimatedBytesWritten(); } /** @@ -132,7 +131,7 @@ public class MemoryRecords implements Records { public int sizeInBytes() { return compressor.buffer().position(); } - + /** * The compression rate of this record set */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java new file mode 100644 index 0000000..cd28077 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java @@ -0,0 +1,20 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class ConsumerMetadataRequest { + + private final String group; + + public ConsumerMetadataRequest(String group) { + this.group = group; + } + + public Struct toStruct() { + Struct consumerMetadata = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id)); + consumerMetadata.set("group", this.group); + return consumerMetadata; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java new file mode 100644 index 0000000..3ea047e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java @@ -0,0 +1,44 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.types.Struct; + +public class ConsumerMetadataResponse { + private final short errorCode; + private final Node coordinator; + + public ConsumerMetadataResponse() { + coordinator = null; + errorCode = 0; + } + + public ConsumerMetadataResponse(Struct struct) { + this.errorCode = (Short) struct.get("error_code"); + Struct coordinatorStruct = (Struct) struct.get("coordinator"); + int nodeId = (Integer) coordinatorStruct.get("node_id"); + String host = (String) coordinatorStruct.get("host"); + int port = (Integer) coordinatorStruct.get("port"); + this.coordinator = new Node(nodeId, host, port); + } + + public Node coordinator() { + return this.coordinator; + } + + public short error() { + return this.errorCode; + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + b.append("coordinator="); + b.append(this.coordinator.toString()); + b.append(','); + b.append("error="); + b.append(this.errorCode); + b.append('}'); + return b.toString(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java new file mode 100644 index 0000000..4294b7f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java @@ -0,0 +1,88 @@ +package org.apache.kafka.common.requests; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +/** + * Request some data from a kafka server + */ +public class FetchRequest { + + private static final Schema SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id); + + private final int replicaId; + private final int maxWaitMs; + private final int minBytes; + private final Map> partitions; + + public FetchRequest(int maxWaitMs, int minBytes) { + this(-1, maxWaitMs, minBytes, new HashMap>()); + } + + public FetchRequest(int replicaId, int maxWaitMs, int minBytes, Map> partitions) { + this.replicaId = replicaId; + this.maxWaitMs = maxWaitMs; + this.minBytes = minBytes; + this.partitions = partitions; + } + + public void addPartition(String topic, int partition, long offset, int maxBytes) { + List fetches = this.partitions.get(topic); + if (fetches == null) { + fetches = new ArrayList(); + this.partitions.put(topic, fetches); + } + fetches.add(new PartitionFetch(topic, partition, offset, maxBytes)); + } + + public Struct toStruct() { + Struct request = new Struct(SCHEMA); + request.set("replica_id", replicaId); + request.set("max_wait_ms", maxWaitMs); + request.set("min_bytes", minBytes); + + Object[] topics = new Object[partitions.size()]; + int i = 0; + for (Map.Entry> entry : partitions.entrySet()) { + Struct topicInfo = request.instance("topics"); + topicInfo.set("topic", entry.getKey()); + Object[] parts = new Object[entry.getValue().size()]; + for (int j = 0; j < parts.length; j++) { + PartitionFetch fetch = entry.getValue().get(j); + Struct partInfo = topicInfo.instance("partition_info"); + partInfo.set("partition", fetch.partition); + partInfo.set("fetch_offset", fetch.offset); + partInfo.set("max_bytes", fetch.maxBytes); + parts[j] = partInfo; + } + topicInfo.set("partition_info", parts); + topics[i] = topicInfo; + i++; + } + request.set("topics", topics); + return request; + } + + public static class PartitionFetch { + private final String topic; + private final int partition; + private final long offset; + private final int maxBytes; + + public PartitionFetch(String topic, int partition, long offset, int maxBytes) { + super(); + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.maxBytes = maxBytes; + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java new file mode 100644 index 0000000..b992550 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -0,0 +1,80 @@ +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.common.protocol.types.Struct; + +public class FetchResponse { + + private List partitions; + + public FetchResponse() { + this.partitions = new ArrayList(); + } + + public FetchResponse(Struct response) { + this(); + Object[] data = response.getArray("data"); + for (Object d : data) { + Struct topicData = (Struct) d; + String topic = topicData.getString("topic"); + Object[] parts = topicData.getArray("partition_data"); + for (Object p : parts) { + Struct partData = (Struct) p; + int partition = partData.getInt("partition"); + short error = partData.getShort("error_code"); + long hwm = partData.getLong("hwm"); + ByteBuffer bytes = partData.getBytes("message_set"); + partitions.add(new PartitionFetch(topic, partition, error, hwm, bytes)); + } + } + } + + public void addPartition(String topic, int partition, short error, long highwaterMark, ByteBuffer data) { + this.partitions.add(new PartitionFetch(topic, partition, error, highwaterMark, data)); + } + + public List partitions() { + return this.partitions; + } + + public static class PartitionFetch { + final String topic; + final int partition; + final short error; + final long highwaterMark; + final ByteBuffer data; + + public PartitionFetch(String topic, int partition, short error, long highwaterMark, ByteBuffer data) { + super(); + this.topic = topic; + this.partition = partition; + this.error = error; + this.highwaterMark = highwaterMark; + this.data = data; + } + + public String topic() { + return topic; + } + + public int partition() { + return partition; + } + + public short error() { + return error; + } + + public long highwaterMark() { + return highwaterMark; + } + + public ByteBuffer data() { + return data; + } + + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 50af601..fe9edf4 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -160,6 +160,18 @@ public class Utils { } /** + * Sleep for a bit + * @param ms The duration of the sleep + */ + public static void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // this is okay, we just wake up early + } + } + + /** * Instantiate the class */ public static Object newInstance(Class c) { diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index aae8d4a..2f7189c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -54,13 +54,25 @@ public class MockClient implements KafkaClient { } @Override - public List poll(List requests, long timeoutMs, long now) { - this.requests.addAll(requests); + public void send(ClientRequest request, long now) { + this.requests.add(request); + } + + @Override + public List poll(long timeoutMs, long now) { List copy = new ArrayList(this.responses); this.responses.clear(); return copy; } + @Override + public List completeAll(long now) { + List responses = poll(0, now); + if (requests.size() > 0) + throw new IllegalStateException("Requests without responses remain."); + return responses; + } + public Queue requests() { return this.requests; } @@ -76,6 +88,11 @@ public class MockClient implements KafkaClient { } @Override + public int inFlightRequestCount(int nodeId) { + return requests.size(); + } + + @Override public RequestHeader nextRequestHeader(ApiKeys key) { return new RequestHeader(key.id, "mock", correlation++); } diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 2f98192..7a3bc36 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -5,7 +5,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -44,14 +43,13 @@ public class NetworkClientTest { @Test public void testReadyAndDisconnect() { - List reqs = new ArrayList(); assertFalse("Client begins unready as it has no connection.", client.ready(node, time.milliseconds())); assertEquals("The connection is established as a side-effect of the readiness check", 1, selector.connected().size()); - client.poll(reqs, 1, time.milliseconds()); + client.poll(1, time.milliseconds()); selector.clear(); assertTrue("Now the client is ready", client.ready(node, time.milliseconds())); selector.disconnect(node.id()); - client.poll(reqs, 1, time.milliseconds()); + client.poll(1, time.milliseconds()); selector.clear(); assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0); @@ -63,7 +61,8 @@ public class NetworkClientTest { client.nextRequestHeader(ApiKeys.METADATA), new MetadataRequest(Arrays.asList("test")).toStruct()); ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null); - client.poll(Arrays.asList(request), 1, time.milliseconds()); + client.send(request, time.milliseconds()); + client.poll(1, time.milliseconds()); } @Test @@ -73,7 +72,8 @@ public class NetworkClientTest { RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct()); ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null); awaitReady(client, node); - client.poll(Arrays.asList(request), 1, time.milliseconds()); + client.send(request, time.milliseconds()); + client.poll(1, time.milliseconds()); assertEquals(1, client.inFlightRequestCount()); ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId()); Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); @@ -84,7 +84,7 @@ public class NetworkClientTest { resp.writeTo(buffer); buffer.flip(); selector.completeReceive(new NetworkReceive(node.id(), buffer)); - List responses = client.poll(new ArrayList(), 1, time.milliseconds()); + List responses = client.poll(1, time.milliseconds()); assertEquals(1, responses.size()); ClientResponse response = responses.get(0); assertTrue("Should have a response body.", response.hasResponse()); @@ -93,7 +93,7 @@ public class NetworkClientTest { private void awaitReady(NetworkClient client, Node node) { while (!client.ready(node, time.milliseconds())) - client.poll(new ArrayList(), 1, time.milliseconds()); + client.poll(1, time.milliseconds()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java index f06e28c..d5cc2f9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -1,30 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.clients.producer; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import java.util.List; - -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.internals.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -48,17 +41,19 @@ public class PartitionerTest { @Test public void testUserSuppliedPartitioning() { - assertEquals("If the user supplies a partition we should use it.", - 0, - partitioner.partition(new ProducerRecord("test", 0, key, value), cluster)); + assertEquals("If the user supplies a partition we should use it.", 0, partitioner.partition(new ProducerRecord("test", + 0, + key, + value), cluster)); } @Test public void testKeyPartitionIsStable() { int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster); - assertEquals("Same key should yield same partition", - partition, - partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster)); + assertEquals("Same key should yield same partition", partition, partitioner.partition(new ProducerRecord("test", + key, + "value2".getBytes()), + cluster)); } @Test @@ -66,9 +61,8 @@ public class PartitionerTest { int startPart = partitioner.partition(new ProducerRecord("test", value), cluster); for (int i = 1; i <= 100; i++) { int partition = partitioner.partition(new ProducerRecord("test", value), cluster); - assertEquals("Should yield a different partition each call with round-robin partitioner", - partition, (startPart + i) % 2); - } + assertEquals("Should yield a different partition each call with round-robin partitioner", partition, (startPart + i) % 2); + } } @Test diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 5c5e3d4..9ed3d6d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -12,7 +12,6 @@ */ package org.apache.kafka.common.network; -import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -73,7 +72,7 @@ public class SelectorTest { // disconnect this.server.closeConnections(); while (!selector.disconnected().contains(node)) - selector.poll(1000L, EMPTY); + selector.poll(1000L); // reconnect and do another request blockingConnect(node); @@ -88,7 +87,8 @@ public class SelectorTest { int node = 0; blockingConnect(node); selector.disconnect(node); - selector.poll(10, asList(createSend(node, "hello1"))); + selector.send(createSend(node, "hello1")); + selector.poll(10); assertEquals("Request should not have succeeded", 0, selector.completedSends().size()); assertEquals("There should be a disconnect", 1, selector.disconnected().size()); assertTrue("The disconnect should be from our node", selector.disconnected().contains(node)); @@ -103,7 +103,9 @@ public class SelectorTest { public void testCantSendWithInProgress() throws Exception { int node = 0; blockingConnect(node); - selector.poll(1000L, asList(createSend(node, "test1"), createSend(node, "test2"))); + selector.send(createSend(node, "test1")); + selector.send(createSend(node, "test2")); + selector.poll(1000L); } /** @@ -111,7 +113,8 @@ public class SelectorTest { */ @Test(expected = IllegalStateException.class) public void testCantSendWithoutConnecting() throws Exception { - selector.poll(1000L, asList(createSend(0, "test"))); + selector.send(createSend(0, "test")); + selector.poll(1000L); } /** @@ -130,7 +133,7 @@ public class SelectorTest { int node = 0; selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE); while (selector.disconnected().contains(node)) - selector.poll(1000L, EMPTY); + selector.poll(1000L); } /** @@ -151,14 +154,13 @@ public class SelectorTest { int[] requests = new int[conns]; int[] responses = new int[conns]; int responseCount = 0; - List sends = new ArrayList(); for (int i = 0; i < conns; i++) - sends.add(createSend(i, i + "-" + 0)); + selector.send(createSend(i, i + "-" + 0)); // loop until we complete all requests while (responseCount < conns * reqs) { // do the i/o - selector.poll(0L, sends); + selector.poll(0L); assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size()); @@ -174,12 +176,11 @@ public class SelectorTest { } // prepare new sends for the next round - sends.clear(); for (NetworkSend send : selector.completedSends()) { int dest = send.destination(); requests[dest]++; if (requests[dest] < reqs) - sends.add(createSend(dest, dest + "-" + requests[dest])); + selector.send(createSend(dest, dest + "-" + requests[dest])); } } } @@ -211,10 +212,34 @@ public class SelectorTest { blockingConnect(0); } + @Test + public void testMute() throws Exception { + blockingConnect(0); + blockingConnect(1); + + selector.send(createSend(0, "hello")); + selector.send(createSend(1, "hi")); + + selector.mute(1); + + while (selector.completedReceives().isEmpty()) + selector.poll(5); + assertEquals("We should have only one response", 1, selector.completedReceives().size()); + assertEquals("The response should not be from the muted node", 0, selector.completedReceives().get(0).source()); + + selector.unmute(1); + do { + selector.poll(5); + } while (selector.completedReceives().isEmpty()); + assertEquals("We should have only one response", 1, selector.completedReceives().size()); + assertEquals("The response should be from the previously muted node", 1, selector.completedReceives().get(0).source()); + } + private String blockingRequest(int node, String s) throws IOException { - selector.poll(1000L, asList(createSend(node, s))); + selector.send(createSend(node, s)); + selector.poll(1000L); while (true) { - selector.poll(1000L, EMPTY); + selector.poll(1000L); for (NetworkReceive receive : selector.completedReceives()) if (receive.source() == node) return asString(receive); @@ -225,7 +250,7 @@ public class SelectorTest { private void blockingConnect(int node) throws IOException { selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE); while (!selector.connected().contains(node)) - selector.poll(10000L, EMPTY); + selector.poll(10000L); } private NetworkSend createSend(int node, String s) { diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index d61de52..ea89b06 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -1,18 +1,14 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.kafka.test; @@ -26,13 +22,13 @@ import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.utils.Time; - /** * A fake selector to use for testing */ public class MockSelector implements Selectable { private final Time time; + private final List initiatedSends = new ArrayList(); private final List completedSends = new ArrayList(); private final List completedReceives = new ArrayList(); private final List disconnected = new ArrayList(); @@ -68,8 +64,14 @@ public class MockSelector implements Selectable { } @Override - public void poll(long timeout, List sends) throws IOException { - this.completedSends.addAll(sends); + public void send(NetworkSend send) { + this.initiatedSends.add(send); + } + + @Override + public void poll(long timeout) throws IOException { + this.completedSends.addAll(this.initiatedSends); + this.initiatedSends.clear(); time.sleep(timeout); } @@ -101,4 +103,20 @@ public class MockSelector implements Selectable { return connected; } + @Override + public void mute(int id) { + } + + @Override + public void unmute(int id) { + } + + @Override + public void muteAll() { + } + + @Override + public void unmuteAll() { + } + } diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala new file mode 100644 index 0000000..a273c61 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -0,0 +1,38 @@ +package kafka.api + +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import scala.collection.mutable.Buffer +import scala.collection.JavaConversions._ +import java.util.ArrayList +import org.junit.Assert._ + +class ConsumerTest extends IntegrationTestHarness(producerCount = 1, consumerCount = 1, serverCount = 2) { + + def testSimpleConsumer() { + val topic = "topic" + val part = 0 + val tp = new TopicPartition(topic, part) + val numRecords = 5000 + for(i <- 0 until numRecords) + this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) + + val records = new ArrayList[ConsumerRecord]() + this.consumers(0).subscribe(tp) + this.consumers(0).seek(tp, 0) + while(records.size < numRecords) + records.addAll(this.consumers(0).poll(5)) + + for(i <- 0 until numRecords) { + val record = records.get(i) + assertEquals(topic, record.topic()) + assertEquals(part, record.partition()) + assertEquals(i.toLong, record.offset()) + assertEquals(i.toString, new String(record.key())) + assertEquals(i.toString, new String(record.value())) + assertNull(record.error()) + } + } + +} \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala new file mode 100644 index 0000000..cb8afe5 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.scalatest.junit.JUnit3Suite +import collection._ +import kafka.utils.TestUtils +import java.util.Properties +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import kafka.server.KafkaConfig +import kafka.integration.KafkaServerTestHarness +import scala.collection.mutable.Buffer + +/** + * A helper class for writing integration tests that involve producers, consumers, and servers + */ +class IntegrationTestHarness(producerCount: Int, + consumerCount: Int, + serverCount: Int, + producerConfig: Properties = new Properties, + consumerConfig: Properties = new Properties) extends JUnit3Suite with KafkaServerTestHarness { + + override val configs = TestUtils.createBrokerConfigs(serverCount).map(new KafkaConfig(_)) + + var consumers = Buffer[KafkaConsumer]() + var producers = Buffer[KafkaProducer]() + + override def setUp() { + super.setUp() + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl) + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl) + for(i <- 0 until producerCount) + producers += new KafkaProducer(producerConfig) + for(i <- 0 until consumerCount) + consumers += new KafkaConsumer(consumerConfig) + } + + override def tearDown() { + super.tearDown() + producers.map(_.close()) + consumers.map(_.close()) + } + +} diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 194dd70..c29f5bb 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -30,6 +30,10 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { val configs: List[KafkaConfig] var servers: List[KafkaServer] = null + + def serverForId(id: Int) = servers.find(s => s.config.brokerId == id) + + def bootstrapUrl = configs.map(c => c.hostName + ":" + c.port).mkString(",") override def setUp() { super.setUp diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 9f04bd3..cd6e06a 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -32,6 +32,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionExc import kafka.utils.{StaticPartitioner, TestUtils, Utils} import kafka.serializer.StringEncoder import java.util.Properties +import TestUtils._ /** * End to end tests of the primitive apis against a local server @@ -265,15 +266,4 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) } } - - /** - * For testing purposes, just create these topics each with one partition and one replica for - * which the provided broker should the leader for. Create and wait for broker to lead. Simple. - */ - private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) { - for( topic <- topics ) { - AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0) - } - } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3faa884..6baca71 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -43,6 +43,7 @@ import kafka.producer.ProducerConfig import junit.framework.AssertionFailedError import junit.framework.Assert._ import org.apache.kafka.clients.producer.KafkaProducer +import collection.Iterable /** * Utility functions to help with testing @@ -689,6 +690,17 @@ object TestUtils extends Logging { def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } + + /** + * For testing purposes, just create these topics each with one partition and one replica for + * which the provided broker should the leader for. Create and wait for broker to lead. Simple. + */ + def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) { + for( topic <- topics ) { + AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0) + } + } } object TestZKUtils {