diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 522881c..27f22aa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -116,7 +116,7 @@ public class NetworkClient implements KafkaClient { /** * Check if the node with the given id is ready to send more requests. - * @param nodeId The node id + * @param node The node id * @param now The current time in ms * @return true if the node is ready */ @@ -232,7 +232,10 @@ public class NetworkClient implements KafkaClient { found = node; } } - + if(found == null) + log.trace("Could not pick a viable least loaded node"); + else + log.trace("Picked {} as the least loaded node", found); return found; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 46efc0c..9201af8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -106,6 +106,10 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms"; + /** metadata.max.age.ms */ + public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; + private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions."; + /** * The total memory used by the consumer to buffer records received from the server. This config is meant to control * the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions. @@ -121,6 +125,14 @@ public class ConsumerConfig extends AbstractConfig { */ public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes"; + /** send.buffer.bytes */ + public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes"; + private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data"; + + /** receive.buffer.bytes */ + public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes"; + private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data"; + /** * The id string to pass to the server when making requests. The purpose of this is to be able to track the source * of requests beyond just ip/port by allowing a logical application name to be included. @@ -128,16 +140,15 @@ public class ConsumerConfig extends AbstractConfig { public static final String CLIENT_ID_CONFIG = "client.id"; /** - * The size of the TCP send buffer to use when fetching data - */ - public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes"; - - /** * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a * host in a tight loop. This backoff applies to all requests sent by the consumer to the broker. */ public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; + /** retry.backoff.ms */ + public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; + private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition." + " This avoids repeated fetching-and-failing in a tight loop."; + /** metrics.sample.window.ms */ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms"; private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. " @@ -151,23 +162,30 @@ public class ConsumerConfig extends AbstractConfig { public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters"; private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; + /** max.in.flight.requests.per.connection */ + public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection"; + private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."; + static { /* TODO: add config docs */ config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah") .define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah") .define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah") .define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah") - .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, Importance.MEDIUM, "blah blah") + .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, "blah", Importance.MEDIUM, "blah blah") .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah") + .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC) .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah") .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah") .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah") .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah") .define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah") - .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah") - .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah") - .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah") + .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC) + .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC) + .define(FETCH_MIN_BYTES_CONFIG, Type.INT, 1024, atLeast(0), Importance.HIGH, "blah blah") + .define(FETCH_MAX_WAIT_MS_CONFIG, Type.INT, 500, atLeast(0), Importance.LOW, "blah blah") .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah") + .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC) .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah") .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, @@ -176,8 +194,13 @@ public class ConsumerConfig extends AbstractConfig { Importance.LOW, METRICS_SAMPLE_WINDOW_MS_DOC) .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC); - + .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC) + .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, + Type.INT, + 5, + atLeast(1), + Importance.LOW, + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC); } ConsumerConfig(Map props) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index fe93afa..f2636af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -9,10 +9,11 @@ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. -*/ + */ package org.apache.kafka.clients.consumer; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -21,25 +22,40 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.clients.producer.internals.RecordBatch; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Metric; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerMetadataRequest; +import org.apache.kafka.common.requests.ConsumerMetadataResponse; +import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.utils.ClientUtils; import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A Kafka client that consumes records from a Kafka cluster. *

- * The consumer is thread safe and should generally be shared among all threads for best performance. + * The consumer is not thread safe and should not be shared among all threads for best performance. *

* The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it * needs to communicate with. Failure to close the consumer after use will leak these resources. @@ -336,11 +352,19 @@ public class KafkaConsumer implements Consumer { private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); private final long metadataFetchTimeoutMs; - private final long totalMemorySize; private final Metrics metrics; private final Set subscribedTopics; private final Set subscribedPartitions; - + private final Metadata metadata; + private final NetworkClient networkClient; + private final int maxWaitMs; + private final int minBytes; + private final int fetchSize; + private final String group; + private Node consumerCoordinator; + private boolean consumerMetadataInProgress; + private boolean consumerMetadataUpdateNeeded; + /** * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings * are documented here. Values can be @@ -396,13 +420,40 @@ public class KafkaConsumer implements Consumer { log.trace("Starting the Kafka consumer"); subscribedTopics = new HashSet(); subscribedPartitions = new HashSet(); - this.metrics = new Metrics(new MetricConfig(), - Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")), - new SystemTime()); + this.maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); + this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG); + this.fetchSize = config.getInt(ConsumerConfig.FETCH_BUFFER_CONFIG); this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG); - this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG); - List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.group = config.getString(ConsumerConfig.GROUP_ID_CONFIG); + + Time time = new SystemTime(); + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); + String jmxPrefix = "kafka.consumer." + (clientId.length() > 0 ? clientId + "." : ""); + List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(jmxPrefix)); + this.metrics = new Metrics(metricConfig, reporters, time); + long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG)); + List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + this.metadata.update(Cluster.bootstrap(addresses), 0); + + networkClient = new NetworkClient(new Selector(this.metrics, time), + this.metadata, + clientId, + config.getInt(ConsumerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), + config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), + config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG)); + config.logUnused(); + + consumerMetadataInProgress = false; + consumerMetadataUpdateNeeded = true; + consumerCoordinator = null; log.debug("Kafka consumer started"); } @@ -452,7 +503,7 @@ public class KafkaConsumer implements Consumer { for(String topic:topics) { if(!subscribedTopics.contains(topic)) throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" + - " to unsubscribe(" + topic + ")"); + " to unsubscribe(" + topic + ")"); subscribedTopics.remove(topic); } // TODO trigger a rebalance operation @@ -468,13 +519,13 @@ public class KafkaConsumer implements Consumer { for(TopicPartition partition:partitions) { if(!subscribedPartitions.contains(partition)) throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" + - partition.topic() + "," + partition.partition() + ") should be called prior" + - " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); + partition.topic() + "," + partition.partition() + ") should be called prior" + + " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")"); subscribedPartitions.remove(partition); } // trigger a rebalance operation } - + /** * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to * any topics or partitions before polling for data. @@ -489,8 +540,30 @@ public class KafkaConsumer implements Consumer { */ @Override public Map poll(long timeout) { - // TODO Auto-generated method stub - return null; + long now = System.currentTimeMillis(); + Map records = new HashMap(); + List requests = new ArrayList(); + // check if consumer metadata requires an update + ClientRequest consumerMetadata = maybeUpdateConsumerMetadata(now); + // check if topic metadata requires an update + // TODO: the following cluster instance will be used while issuing fetch requests + Cluster cluster = null; + for(TopicPartition partition : subscribedPartitions) { + cluster = metadata.fetch(partition.topic(), -1L); + } + if(consumerMetadata != null) { + requests.add(consumerMetadata); + log.trace("Attempting to refresh consumer metadata {} with correlation id {}", consumerMetadata.request(), + consumerMetadata.request().header().correlationId()); + } + List responses = networkClient.poll(requests, timeout, now); + for (ClientResponse response : responses) { + if (response.wasDisconnected()) + handleDisconnect(response, now); + else + handleResponse(response, now); + } + return records; } /** @@ -570,6 +643,10 @@ public class KafkaConsumer implements Consumer { return Collections.unmodifiableMap(this.metrics.metrics()); } + public List partitionsFor(String topic) { + return this.metadata.fetch(topic, -1L).partitionsForTopic(topic); + } + @Override public void close() { log.trace("Closing the Kafka consumer."); @@ -578,4 +655,70 @@ public class KafkaConsumer implements Consumer { this.metrics.close(); log.debug("The Kafka consumer has closed."); } + + /** + * Create a consumer metadata request for the given group + */ + private ClientRequest maybeUpdateConsumerMetadata(long now) { + if(consumerMetadataInProgress || !consumerMetadataUpdateNeeded) + return null; + ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.group); + Node destination = this.networkClient.leastLoadedNode(now); + if(destination == null) // all nodes are blacked out + return null; + if(!networkClient.isReady(destination, now)) { + networkClient.ready(destination, now); + return null; + } + RequestSend send = new RequestSend(destination.id(), + this.networkClient.nextRequestHeader(ApiKeys.CONSUMER_METADATA), + request.toStruct()); + ClientRequest consumerMetadataRequest = new ClientRequest(now, true, send, null); + consumerMetadataInProgress = true; + consumerMetadataUpdateNeeded = false; + return consumerMetadataRequest; + } + + private void handleDisconnect(ClientResponse response, long now) { + int correlation = response.request().request().header().correlationId(); + log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected", response.request(), correlation, + response.request().request().destination()); + if(response.request().request().header().apiKey() == ApiKeys.CONSUMER_METADATA.id) { + this.consumerMetadataInProgress = false; + this.consumerMetadataUpdateNeeded = true; + } + metadata.forceUpdate(); + } + + /** + * Handle a response + */ + private void handleResponse(ClientResponse response, long now) { + int correlationId = response.request().request().header().correlationId(); + log.trace("Received {} response {} from node {} with correlation id {}", + ApiKeys.forId(response.request().request().header().apiKey()), + response.responseBody(), + response.request().request().destination(), + correlationId); + // if we have a response, parse it + if (response.hasResponse()) { + if(response.request().request().header().apiKey() == ApiKeys.CONSUMER_METADATA.id) + handleConsumerMetadataResponse(response, now); + else + throw new IllegalStateException("Unrecognized API in response " + ApiKeys.forId(response.request().request().header().apiKey())); + } else { + // every response for a consumer's request should have response content, unless there was a network disconnect + throw new IllegalStateException("Response for " + ApiKeys.forId(response.request().request().header().apiKey()) + " returned with empty content"); + } + } + + private void handleConsumerMetadataResponse(ClientResponse response, long now) { + ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(response.responseBody()); + if(consumerMetadataResponse.error() == Errors.NONE.code()) + this.consumerCoordinator = consumerMetadataResponse.coordinator(); + else + this.consumerMetadataUpdateNeeded = true; + this.consumerMetadataInProgress = false; + return; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 57bc285..e8e2904 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -74,7 +74,7 @@ public final class Metadata { * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic, * block waiting for an update. * @param topic The topic we want metadata for - * @param maxWaitMs The maximum amount of time to block waiting for metadata + * @param maxWaitMs The maximum amount of time to block waiting for metadata. -1 indicates no blocking */ public synchronized Cluster fetch(String topic, long maxWaitMs) { List partitions = null; @@ -87,7 +87,11 @@ public final class Metadata { forceUpdate = true; try { log.trace("Requesting metadata update for topic {}.", topic); - wait(remainingWaitMs); + if(maxWaitMs != -1L) { + log.trace("Waiting for {} time", remainingWaitMs); + wait(remainingWaitMs); + } else + return cluster; } catch (InterruptedException e) { /* this is fine, just try again */ } long elapsed = System.currentTimeMillis() - begin; diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java index b15aa2c..b16c914 100644 --- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java +++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java @@ -12,6 +12,9 @@ */ package org.apache.kafka.common; +import java.util.Arrays; +import java.util.Set; + /** * Information about a topic-partition. */ @@ -20,10 +23,10 @@ public class PartitionInfo { private final String topic; private final int partition; private final Node leader; - private final Node[] replicas; - private final Node[] inSyncReplicas; + private final Set replicas; + private final Set inSyncReplicas; - public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) { + public PartitionInfo(String topic, int partition, Node leader, Set replicas, Set inSyncReplicas) { this.topic = topic; this.partition = partition; this.leader = leader; @@ -55,7 +58,7 @@ public class PartitionInfo { /** * The complete set of replicas for this partition regardless of whether they are alive or up-to-date */ - public Node[] replicas() { + public Set replicas() { return replicas; } @@ -63,11 +66,45 @@ public class PartitionInfo { * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if * the leader should fail */ - public Node[] inSyncReplicas() { + public Set inSyncReplicas() { return inSyncReplicas; } @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + topic.hashCode(); + result = prime * result + partition; + result = prime * result + leader.hashCode(); + result = prime * result + replicas.hashCode(); + result = prime * result + inSyncReplicas.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + PartitionInfo other = (PartitionInfo) obj; + if (!topic.equals(other.topic)) + return false; + if (partition != other.partition) + return false; + if (!leader.equals(other.leader)) + return false; + if (!replicas.equals(other.replicas)) + return false; + if (!inSyncReplicas.equals(other.inSyncReplicas)) + return false; + return true; + } + + @Override public String toString() { return String.format("Partition(topic = %s, partition = %d, leader = %d, replicas = %s, isr = %s", topic, @@ -78,15 +115,14 @@ public class PartitionInfo { } /* Extract the node ids from each item in the array and format for display */ - private String fmtNodeIds(Node[] nodes) { + private String fmtNodeIds(Set nodes) { StringBuilder b = new StringBuilder("["); - for (int i = 0; i < nodes.length - 1; i++) { - b.append(Integer.toString(nodes[i].id())); - b.append(','); - } - if (nodes.length > 0) { - b.append(Integer.toString(nodes[nodes.length - 1].id())); - b.append(','); + int size = 0; + for (Node node : nodes) { + b.append(Integer.toString(node.id())); + size += 1; + if(size < nodes.size()) + b.append(','); } b.append("]"); return b.toString(); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6fe7573..9ad637a 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -17,8 +17,6 @@ package org.apache.kafka.common.protocol; -import java.util.ArrayList; -import java.util.List; /** * Identifiers for all the Kafka APIs @@ -30,8 +28,11 @@ public enum ApiKeys { METADATA(3, "metadata"), LEADER_AND_ISR(4, "leader_and_isr"), STOP_REPLICA(5, "stop_replica"), - OFFSET_COMMIT(6, "offset_commit"), - OFFSET_FETCH(7, "offset_fetch"); + UPDATE_METADATA(6, "update_metadata"), + CONTROLLED_SHUTDOWN(7, "controlled_shutdown"), + OFFSET_COMMIT(8, "offset_commit"), + OFFSET_FETCH(9, "offset_fetch"), + CONSUMER_METADATA(10, "consumer_metadata"); private static ApiKeys[] codeToType; public static int MAX_API_KEY = -1; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 044b030..c4ac1fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -101,6 +101,14 @@ public class Protocol { new Field("base_offset", INT64)))))))); + /* Consume api */ + public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group", STRING)); + public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16), + new Field("coordinator", BROKER)); + + public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 }; + public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 }; + public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 }; public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 }; @@ -118,8 +126,11 @@ public class Protocol { REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + REQUESTS[ApiKeys.UPDATE_METADATA.id] = new Schema[] {}; + REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = new Schema[] {}; @@ -127,8 +138,11 @@ public class Protocol { RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE; RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {}; + RESPONSES[ApiKeys.UPDATE_METADATA.id] = new Schema[] {}; + RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {}; RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {}; + RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java new file mode 100644 index 0000000..cd28077 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java @@ -0,0 +1,20 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; + +public class ConsumerMetadataRequest { + + private final String group; + + public ConsumerMetadataRequest(String group) { + this.group = group; + } + + public Struct toStruct() { + Struct consumerMetadata = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id)); + consumerMetadata.set("group", this.group); + return consumerMetadata; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java new file mode 100644 index 0000000..3ea047e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java @@ -0,0 +1,44 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.types.Struct; + +public class ConsumerMetadataResponse { + private final short errorCode; + private final Node coordinator; + + public ConsumerMetadataResponse() { + coordinator = null; + errorCode = 0; + } + + public ConsumerMetadataResponse(Struct struct) { + this.errorCode = (Short) struct.get("error_code"); + Struct coordinatorStruct = (Struct) struct.get("coordinator"); + int nodeId = (Integer) coordinatorStruct.get("node_id"); + String host = (String) coordinatorStruct.get("host"); + int port = (Integer) coordinatorStruct.get("port"); + this.coordinator = new Node(nodeId, host, port); + } + + public Node coordinator() { + return this.coordinator; + } + + public short error() { + return this.errorCode; + } + + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append('{'); + b.append("coordinator="); + b.append(this.coordinator.toString()); + b.append(','); + b.append("error="); + b.append(this.errorCode); + b.append('}'); + return b.toString(); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 2652c32..1355560 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -14,8 +14,10 @@ package org.apache.kafka.common.requests; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -60,13 +62,13 @@ public class MetadataResponse { int leader = partitionInfo.getInt("leader"); Node leaderNode = leader == -1 ? null : brokers.get(leader); Object[] replicas = (Object[]) partitionInfo.get("replicas"); - Node[] replicaNodes = new Node[replicas.length]; + Set replicaNodes = new HashSet(); for (int k = 0; k < replicas.length; k++) - replicaNodes[k] = brokers.get(replicas[k]); + replicaNodes.add(brokers.get(replicas[k])); Object[] isr = (Object[]) partitionInfo.get("isr"); - Node[] isrNodes = new Node[isr.length]; + Set isrNodes = new HashSet(); for (int k = 0; k < isr.length; k++) - isrNodes[k] = brokers.get(isr[k]); + isrNodes.add(brokers.get(isr[k])); partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes)); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java index f06e28c..df58cdc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java @@ -18,13 +18,12 @@ package org.apache.kafka.clients.producer; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; +import java.util.HashSet; import java.util.List; +import java.util.Set; - -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.internals.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -39,7 +38,7 @@ public class PartitionerTest { private Node node0 = new Node(0, "localhost", 99); private Node node1 = new Node(1, "localhost", 100); private Node node2 = new Node(2, "localhost", 101); - private Node[] nodes = new Node[] { node0, node1, node2 }; + private Set nodes = new HashSet(asList(node0, node1, node2)); private String topic = "test"; private List partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes), new PartitionInfo(topic, 1, node1, nodes, nodes), diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 76a17e8..1d014fc 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -22,8 +22,10 @@ import java.io.File; import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -50,13 +52,15 @@ public class TestUtils { } public static Cluster clusterWith(int nodes, String topic, int partitions) { - Node[] ns = new Node[nodes]; + Set ns = new HashSet(); for (int i = 0; i < nodes; i++) - ns[i] = new Node(0, "localhost", 1969); + ns.add(new Node(0, "localhost", 1969)); + Node[] nodeArray = new Node[ns.size()]; + ns.toArray(nodeArray); List parts = new ArrayList(); for (int i = 0; i < partitions; i++) - parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns)); - return new Cluster(asList(ns), parts); + parts.add(new PartitionInfo(topic, i, nodeArray[i % ns.size()], ns, ns)); + return new Cluster(ns, parts); } /** diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerTest.scala new file mode 100644 index 0000000..3ae1d21 --- /dev/null +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerTest.scala @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.integration + +import kafka.server.{KafkaServer, KafkaRequestHandler, KafkaConfig} +import org.apache.log4j.Logger +import kafka.zk.ZooKeeperTestHarness +import org.scalatest.junit.JUnit3Suite +import collection._ +import kafka.utils.{Utils, TestUtils} +import java.util.Properties +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.{Node, PartitionInfo} +import scala.collection.JavaConversions._ + +/** + * End to end tests of the consumer apis against a local server + */ +class ConsumerTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness { + val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) + + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + + private val topic = "newtopic" + val configs = List(config1, config2) + private val nodes = configs.map(config => new Node(config.brokerId, config.hostName, config.port)).toSet + private val nodeMap = nodes.map(node => (node.id(), node)).toMap + + private var serverMap: Map[Int, KafkaServer] = new mutable.HashMap[Int, KafkaServer]() + + override def setUp() { + super.setUp() + serverMap += servers.head.config.brokerId -> servers.head + serverMap += servers.last.config.brokerId -> servers.last + } + + def testTopicMetadataDiscoveryInConsumer() { + // create topic + TestUtils.createTopic(zkClient, topic, 1, 2, servers) + var leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + val props = new Properties() + props.put("bootstrap.servers", configs.map(config => config.hostName + ":" + config.port).foldLeft("")((url,str) => url + "," + str)) + props.put("group.id", "test") + val leaderNode = nodeMap(leader) + import scala.collection.JavaConversions._ + val partitionInfo = new PartitionInfo(topic, 0, leaderNode, nodes, nodes) + val consumer = new KafkaConsumer(props) + TestUtils.waitUntilTrue(() => { + val returnedPartitionInfo = pollUntilMetadataRefresh(consumer, topic, 1000) + println("Received partition info = " + returnedPartitionInfo + " Compared to " + partitionInfo) + if(returnedPartitionInfo != null) { + returnedPartitionInfo == partitionInfo + } + else false + }, "Topic metadata was not refreshed", 2000) + consumer.close() + } + + def testMetadataRefreshAfterFailure() { + // create topic + TestUtils.createTopic(zkClient, topic, 1, 2, servers) + var leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) + val props = new Properties() + props.put("bootstrap.servers", configs.map(config => config.hostName + ":" + config.port).foldLeft("")((url,str) => url + "," + str)) + props.put("group.id", "test") + props.put("metadata.max.age.ms", "500") + props.put("reconnect.backoff.ms", "500") + val leaderNode = nodeMap(leader) + val partitionInfo = new PartitionInfo(topic, 0, leaderNode, nodes, nodes) + val consumer = new KafkaConsumer(props) + TestUtils.waitUntilTrue(() => { + val returnedPartitionInfo = pollUntilMetadataRefresh(consumer, topic, 500) + if(returnedPartitionInfo != null) { + returnedPartitionInfo == partitionInfo + } + else false + }, "Topic metadata was not refreshed", 1000) + + val leaderServer = serverMap(leader) + leaderServer.shutdown() + leaderServer.awaitShutdown() + + // double check that the leader info has been propagated after consecutive bounces + val newLeader = TestUtils.waitUntilMetadataIsPropagated(servers.filterNot(s => s.config.brokerId == leader), topic, 0, 1000) + val newLeaderNode = nodeMap(newLeader) + val newPartitionInfo = new PartitionInfo(topic, 0, newLeaderNode, nodes, nodes - leaderNode) + TestUtils.waitUntilTrue(() => { + val returnedPartitionInfo = pollUntilMetadataRefresh(consumer, topic, 500) + if(returnedPartitionInfo != null) { + returnedPartitionInfo == newPartitionInfo + } + else false + }, "Topic metadata was not refreshed", 2000) + consumer.close() + } + + private def pollUntilMetadataRefresh(consumer: KafkaConsumer, topic: String, timeout: Int): PartitionInfo = { + var remainingTime = timeout + while(remainingTime >= 0) { + consumer.poll(100) + remainingTime -= 100 + Thread.sleep(100) + } + val partitionsInfo = consumer.partitionsFor(topic) + val partitionInfo = if(partitionsInfo != null) partitionsInfo.get(0) else null + partitionInfo + } +} diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 9f04bd3..cd6e06a 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -32,6 +32,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionExc import kafka.utils.{StaticPartitioner, TestUtils, Utils} import kafka.serializer.StringEncoder import java.util.Properties +import TestUtils._ /** * End to end tests of the primitive apis against a local server @@ -265,15 +266,4 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) } } - - /** - * For testing purposes, just create these topics each with one partition and one replica for - * which the provided broker should the leader for. Create and wait for broker to lead. Simple. - */ - private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) { - for( topic <- topics ) { - AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0) - } - } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 57b2bd5..e71f912 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -43,6 +43,7 @@ import kafka.producer.ProducerConfig import junit.framework.AssertionFailedError import junit.framework.Assert._ import org.apache.kafka.clients.producer.KafkaProducer +import collection.Iterable /** * Utility functions to help with testing @@ -682,6 +683,17 @@ object TestUtils extends Logging { def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } + + /** + * For testing purposes, just create these topics each with one partition and one replica for + * which the provided broker should the leader for. Create and wait for broker to lead. Simple. + */ + def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) { + for( topic <- topics ) { + AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0) + } + } } object TestZKUtils {