diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 522881c..27f22aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -116,7 +116,7 @@ public class NetworkClient implements KafkaClient {
/**
* Check if the node with the given id is ready to send more requests.
- * @param nodeId The node id
+ * @param node The node id
* @param now The current time in ms
* @return true if the node is ready
*/
@@ -232,7 +232,10 @@ public class NetworkClient implements KafkaClient {
found = node;
}
}
-
+ if(found == null)
+ log.trace("Could not pick a viable least loaded node");
+ else
+ log.trace("Picked {} as the least loaded node", found);
return found;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 46efc0c..9201af8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -106,6 +106,10 @@ public class ConsumerConfig extends AbstractConfig {
*/
public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
+ /** metadata.max.age.ms */
+ public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
+ private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions.";
+
/**
* The total memory used by the consumer to buffer records received from the server. This config is meant to control
* the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions.
@@ -121,6 +125,14 @@ public class ConsumerConfig extends AbstractConfig {
*/
public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes";
+ /** send.buffer.bytes */
+ public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
+ private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data";
+
+ /** receive.buffer.bytes */
+ public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
+ private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data";
+
/**
* The id string to pass to the server when making requests. The purpose of this is to be able to track the source
* of requests beyond just ip/port by allowing a logical application name to be included.
@@ -128,16 +140,15 @@ public class ConsumerConfig extends AbstractConfig {
public static final String CLIENT_ID_CONFIG = "client.id";
/**
- * The size of the TCP send buffer to use when fetching data
- */
- public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes";
-
- /**
* The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a
* host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
*/
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+ /** retry.backoff.ms */
+ public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
+ private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition." + " This avoids repeated fetching-and-failing in a tight loop.";
+
/** metrics.sample.window.ms */
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. "
@@ -151,23 +162,30 @@ public class ConsumerConfig extends AbstractConfig {
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+ /** max.in.flight.requests.per.connection */
+ public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
+ private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking.";
+
static {
/* TODO: add config docs */
config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah")
.define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah")
.define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah")
.define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah")
- .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, Importance.MEDIUM, "blah blah")
+ .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, "blah", Importance.MEDIUM, "blah blah")
.define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah")
+ .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
.define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah")
.define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah")
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah")
.define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah")
.define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah")
- .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah")
- .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah")
- .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah")
+ .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC)
+ .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
+ .define(FETCH_MIN_BYTES_CONFIG, Type.INT, 1024, atLeast(0), Importance.HIGH, "blah blah")
+ .define(FETCH_MAX_WAIT_MS_CONFIG, Type.INT, 500, atLeast(0), Importance.LOW, "blah blah")
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah")
+ .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC)
.define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah")
.define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
Type.LONG,
@@ -176,8 +194,13 @@ public class ConsumerConfig extends AbstractConfig {
Importance.LOW,
METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
- .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
-
+ .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
+ .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
+ Type.INT,
+ 5,
+ atLeast(1),
+ Importance.LOW,
+ MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC);
}
ConsumerConfig(Map extends Object, ? extends Object> props) {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index fe93afa..f2636af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -9,10 +9,11 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -21,25 +22,40 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.Map.Entry;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
-import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.clients.producer.internals.RecordBatch;
+import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerMetadataRequest;
+import org.apache.kafka.common.requests.ConsumerMetadataResponse;
+import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Kafka client that consumes records from a Kafka cluster.
*
- * The consumer is thread safe and should generally be shared among all threads for best performance.
+ * The consumer is not thread safe and should not be shared among all threads for best performance.
*
* The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it
* needs to communicate with. Failure to close the consumer after use will leak these resources.
@@ -336,11 +352,19 @@ public class KafkaConsumer implements Consumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private final long metadataFetchTimeoutMs;
- private final long totalMemorySize;
private final Metrics metrics;
private final Set subscribedTopics;
private final Set subscribedPartitions;
-
+ private final Metadata metadata;
+ private final NetworkClient networkClient;
+ private final int maxWaitMs;
+ private final int minBytes;
+ private final int fetchSize;
+ private final String group;
+ private Node consumerCoordinator;
+ private boolean consumerMetadataInProgress;
+ private boolean consumerMetadataUpdateNeeded;
+
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented here. Values can be
@@ -396,13 +420,40 @@ public class KafkaConsumer implements Consumer {
log.trace("Starting the Kafka consumer");
subscribedTopics = new HashSet();
subscribedPartitions = new HashSet();
- this.metrics = new Metrics(new MetricConfig(),
- Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")),
- new SystemTime());
+ this.maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
+ this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG);
+ this.fetchSize = config.getInt(ConsumerConfig.FETCH_BUFFER_CONFIG);
this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
- this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
- List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ this.group = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
+
+ Time time = new SystemTime();
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS);
+ String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
+ String jmxPrefix = "kafka.consumer." + (clientId.length() > 0 ? clientId + "." : "");
+ List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
+ reporters.add(new JmxReporter(jmxPrefix));
+ this.metrics = new Metrics(metricConfig, reporters, time);
+ long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
+ List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ this.metadata.update(Cluster.bootstrap(addresses), 0);
+
+ networkClient = new NetworkClient(new Selector(this.metrics, time),
+ this.metadata,
+ clientId,
+ config.getInt(ConsumerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+ config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+ config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
+ config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG));
+
config.logUnused();
+
+ consumerMetadataInProgress = false;
+ consumerMetadataUpdateNeeded = true;
+ consumerCoordinator = null;
log.debug("Kafka consumer started");
}
@@ -452,7 +503,7 @@ public class KafkaConsumer implements Consumer {
for(String topic:topics) {
if(!subscribedTopics.contains(topic))
throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
- " to unsubscribe(" + topic + ")");
+ " to unsubscribe(" + topic + ")");
subscribedTopics.remove(topic);
}
// TODO trigger a rebalance operation
@@ -468,13 +519,13 @@ public class KafkaConsumer implements Consumer {
for(TopicPartition partition:partitions) {
if(!subscribedPartitions.contains(partition))
throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" +
- partition.topic() + "," + partition.partition() + ") should be called prior" +
- " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
+ partition.topic() + "," + partition.partition() + ") should be called prior" +
+ " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
subscribedPartitions.remove(partition);
}
// trigger a rebalance operation
}
-
+
/**
* Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to
* any topics or partitions before polling for data.
@@ -489,8 +540,30 @@ public class KafkaConsumer implements Consumer {
*/
@Override
public Map poll(long timeout) {
- // TODO Auto-generated method stub
- return null;
+ long now = System.currentTimeMillis();
+ Map records = new HashMap();
+ List requests = new ArrayList();
+ // check if consumer metadata requires an update
+ ClientRequest consumerMetadata = maybeUpdateConsumerMetadata(now);
+ // check if topic metadata requires an update
+ // TODO: the following cluster instance will be used while issuing fetch requests
+ Cluster cluster = null;
+ for(TopicPartition partition : subscribedPartitions) {
+ cluster = metadata.fetch(partition.topic(), -1L);
+ }
+ if(consumerMetadata != null) {
+ requests.add(consumerMetadata);
+ log.trace("Attempting to refresh consumer metadata {} with correlation id {}", consumerMetadata.request(),
+ consumerMetadata.request().header().correlationId());
+ }
+ List responses = networkClient.poll(requests, timeout, now);
+ for (ClientResponse response : responses) {
+ if (response.wasDisconnected())
+ handleDisconnect(response, now);
+ else
+ handleResponse(response, now);
+ }
+ return records;
}
/**
@@ -570,6 +643,10 @@ public class KafkaConsumer implements Consumer {
return Collections.unmodifiableMap(this.metrics.metrics());
}
+ public List partitionsFor(String topic) {
+ return this.metadata.fetch(topic, -1L).partitionsForTopic(topic);
+ }
+
@Override
public void close() {
log.trace("Closing the Kafka consumer.");
@@ -578,4 +655,70 @@ public class KafkaConsumer implements Consumer {
this.metrics.close();
log.debug("The Kafka consumer has closed.");
}
+
+ /**
+ * Create a consumer metadata request for the given group
+ */
+ private ClientRequest maybeUpdateConsumerMetadata(long now) {
+ if(consumerMetadataInProgress || !consumerMetadataUpdateNeeded)
+ return null;
+ ConsumerMetadataRequest request = new ConsumerMetadataRequest(this.group);
+ Node destination = this.networkClient.leastLoadedNode(now);
+ if(destination == null) // all nodes are blacked out
+ return null;
+ if(!networkClient.isReady(destination, now)) {
+ networkClient.ready(destination, now);
+ return null;
+ }
+ RequestSend send = new RequestSend(destination.id(),
+ this.networkClient.nextRequestHeader(ApiKeys.CONSUMER_METADATA),
+ request.toStruct());
+ ClientRequest consumerMetadataRequest = new ClientRequest(now, true, send, null);
+ consumerMetadataInProgress = true;
+ consumerMetadataUpdateNeeded = false;
+ return consumerMetadataRequest;
+ }
+
+ private void handleDisconnect(ClientResponse response, long now) {
+ int correlation = response.request().request().header().correlationId();
+ log.trace("Cancelled request {} with correlation id {} due to node {} being disconnected", response.request(), correlation,
+ response.request().request().destination());
+ if(response.request().request().header().apiKey() == ApiKeys.CONSUMER_METADATA.id) {
+ this.consumerMetadataInProgress = false;
+ this.consumerMetadataUpdateNeeded = true;
+ }
+ metadata.forceUpdate();
+ }
+
+ /**
+ * Handle a response
+ */
+ private void handleResponse(ClientResponse response, long now) {
+ int correlationId = response.request().request().header().correlationId();
+ log.trace("Received {} response {} from node {} with correlation id {}",
+ ApiKeys.forId(response.request().request().header().apiKey()),
+ response.responseBody(),
+ response.request().request().destination(),
+ correlationId);
+ // if we have a response, parse it
+ if (response.hasResponse()) {
+ if(response.request().request().header().apiKey() == ApiKeys.CONSUMER_METADATA.id)
+ handleConsumerMetadataResponse(response, now);
+ else
+ throw new IllegalStateException("Unrecognized API in response " + ApiKeys.forId(response.request().request().header().apiKey()));
+ } else {
+ // every response for a consumer's request should have response content, unless there was a network disconnect
+ throw new IllegalStateException("Response for " + ApiKeys.forId(response.request().request().header().apiKey()) + " returned with empty content");
+ }
+ }
+
+ private void handleConsumerMetadataResponse(ClientResponse response, long now) {
+ ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(response.responseBody());
+ if(consumerMetadataResponse.error() == Errors.NONE.code())
+ this.consumerCoordinator = consumerMetadataResponse.coordinator();
+ else
+ this.consumerMetadataUpdateNeeded = true;
+ this.consumerMetadataInProgress = false;
+ return;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 57bc285..e8e2904 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -74,7 +74,7 @@ public final class Metadata {
* Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic,
* block waiting for an update.
* @param topic The topic we want metadata for
- * @param maxWaitMs The maximum amount of time to block waiting for metadata
+ * @param maxWaitMs The maximum amount of time to block waiting for metadata. -1 indicates no blocking
*/
public synchronized Cluster fetch(String topic, long maxWaitMs) {
List partitions = null;
@@ -87,7 +87,11 @@ public final class Metadata {
forceUpdate = true;
try {
log.trace("Requesting metadata update for topic {}.", topic);
- wait(remainingWaitMs);
+ if(maxWaitMs != -1L) {
+ log.trace("Waiting for {} time", remainingWaitMs);
+ wait(remainingWaitMs);
+ } else
+ return cluster;
} catch (InterruptedException e) { /* this is fine, just try again */
}
long elapsed = System.currentTimeMillis() - begin;
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index b15aa2c..b16c914 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -12,6 +12,9 @@
*/
package org.apache.kafka.common;
+import java.util.Arrays;
+import java.util.Set;
+
/**
* Information about a topic-partition.
*/
@@ -20,10 +23,10 @@ public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
- private final Node[] replicas;
- private final Node[] inSyncReplicas;
+ private final Set replicas;
+ private final Set inSyncReplicas;
- public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
+ public PartitionInfo(String topic, int partition, Node leader, Set replicas, Set inSyncReplicas) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
@@ -55,7 +58,7 @@ public class PartitionInfo {
/**
* The complete set of replicas for this partition regardless of whether they are alive or up-to-date
*/
- public Node[] replicas() {
+ public Set replicas() {
return replicas;
}
@@ -63,11 +66,45 @@ public class PartitionInfo {
* The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
* the leader should fail
*/
- public Node[] inSyncReplicas() {
+ public Set inSyncReplicas() {
return inSyncReplicas;
}
@Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + topic.hashCode();
+ result = prime * result + partition;
+ result = prime * result + leader.hashCode();
+ result = prime * result + replicas.hashCode();
+ result = prime * result + inSyncReplicas.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PartitionInfo other = (PartitionInfo) obj;
+ if (!topic.equals(other.topic))
+ return false;
+ if (partition != other.partition)
+ return false;
+ if (!leader.equals(other.leader))
+ return false;
+ if (!replicas.equals(other.replicas))
+ return false;
+ if (!inSyncReplicas.equals(other.inSyncReplicas))
+ return false;
+ return true;
+ }
+
+ @Override
public String toString() {
return String.format("Partition(topic = %s, partition = %d, leader = %d, replicas = %s, isr = %s",
topic,
@@ -78,15 +115,14 @@ public class PartitionInfo {
}
/* Extract the node ids from each item in the array and format for display */
- private String fmtNodeIds(Node[] nodes) {
+ private String fmtNodeIds(Set nodes) {
StringBuilder b = new StringBuilder("[");
- for (int i = 0; i < nodes.length - 1; i++) {
- b.append(Integer.toString(nodes[i].id()));
- b.append(',');
- }
- if (nodes.length > 0) {
- b.append(Integer.toString(nodes[nodes.length - 1].id()));
- b.append(',');
+ int size = 0;
+ for (Node node : nodes) {
+ b.append(Integer.toString(node.id()));
+ size += 1;
+ if(size < nodes.size())
+ b.append(',');
}
b.append("]");
return b.toString();
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 6fe7573..9ad637a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -17,8 +17,6 @@
package org.apache.kafka.common.protocol;
-import java.util.ArrayList;
-import java.util.List;
/**
* Identifiers for all the Kafka APIs
@@ -30,8 +28,11 @@ public enum ApiKeys {
METADATA(3, "metadata"),
LEADER_AND_ISR(4, "leader_and_isr"),
STOP_REPLICA(5, "stop_replica"),
- OFFSET_COMMIT(6, "offset_commit"),
- OFFSET_FETCH(7, "offset_fetch");
+ UPDATE_METADATA(6, "update_metadata"),
+ CONTROLLED_SHUTDOWN(7, "controlled_shutdown"),
+ OFFSET_COMMIT(8, "offset_commit"),
+ OFFSET_FETCH(9, "offset_fetch"),
+ CONSUMER_METADATA(10, "consumer_metadata");
private static ApiKeys[] codeToType;
public static int MAX_API_KEY = -1;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 044b030..c4ac1fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -101,6 +101,14 @@ public class Protocol {
new Field("base_offset",
INT64))))))));
+ /* Consume api */
+ public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group", STRING));
+ public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("coordinator", BROKER));
+
+ public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 };
+ public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 };
+
public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
@@ -118,8 +126,11 @@ public class Protocol {
REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+ REQUESTS[ApiKeys.UPDATE_METADATA.id] = new Schema[] {};
+ REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN.id] = new Schema[] {};
REQUESTS[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
REQUESTS[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+ REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = new Schema[] {};
@@ -127,8 +138,11 @@ public class Protocol {
RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+ RESPONSES[ApiKeys.UPDATE_METADATA.id] = new Schema[] {};
+ RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN.id] = new Schema[] {};
RESPONSES[ApiKeys.OFFSET_COMMIT.id] = new Schema[] {};
RESPONSES[ApiKeys.OFFSET_FETCH.id] = new Schema[] {};
+ RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
/* set the maximum version of each api */
for (ApiKeys api : ApiKeys.values())
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
new file mode 100644
index 0000000..cd28077
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -0,0 +1,20 @@
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class ConsumerMetadataRequest {
+
+ private final String group;
+
+ public ConsumerMetadataRequest(String group) {
+ this.group = group;
+ }
+
+ public Struct toStruct() {
+ Struct consumerMetadata = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id));
+ consumerMetadata.set("group", this.group);
+ return consumerMetadata;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
new file mode 100644
index 0000000..3ea047e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
@@ -0,0 +1,44 @@
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class ConsumerMetadataResponse {
+ private final short errorCode;
+ private final Node coordinator;
+
+ public ConsumerMetadataResponse() {
+ coordinator = null;
+ errorCode = 0;
+ }
+
+ public ConsumerMetadataResponse(Struct struct) {
+ this.errorCode = (Short) struct.get("error_code");
+ Struct coordinatorStruct = (Struct) struct.get("coordinator");
+ int nodeId = (Integer) coordinatorStruct.get("node_id");
+ String host = (String) coordinatorStruct.get("host");
+ int port = (Integer) coordinatorStruct.get("port");
+ this.coordinator = new Node(nodeId, host, port);
+ }
+
+ public Node coordinator() {
+ return this.coordinator;
+ }
+
+ public short error() {
+ return this.errorCode;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder b = new StringBuilder();
+ b.append('{');
+ b.append("coordinator=");
+ b.append(this.coordinator.toString());
+ b.append(',');
+ b.append("error=");
+ b.append(this.errorCode);
+ b.append('}');
+ return b.toString();
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 2652c32..1355560 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -14,8 +14,10 @@ package org.apache.kafka.common.requests;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -60,13 +62,13 @@ public class MetadataResponse {
int leader = partitionInfo.getInt("leader");
Node leaderNode = leader == -1 ? null : brokers.get(leader);
Object[] replicas = (Object[]) partitionInfo.get("replicas");
- Node[] replicaNodes = new Node[replicas.length];
+ Set replicaNodes = new HashSet();
for (int k = 0; k < replicas.length; k++)
- replicaNodes[k] = brokers.get(replicas[k]);
+ replicaNodes.add(brokers.get(replicas[k]));
Object[] isr = (Object[]) partitionInfo.get("isr");
- Node[] isrNodes = new Node[isr.length];
+ Set isrNodes = new HashSet();
for (int k = 0; k < isr.length; k++)
- isrNodes[k] = brokers.get(isr[k]);
+ isrNodes.add(brokers.get(isr[k]));
partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index f06e28c..df58cdc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -18,13 +18,12 @@ package org.apache.kafka.clients.producer;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -39,7 +38,7 @@ public class PartitionerTest {
private Node node0 = new Node(0, "localhost", 99);
private Node node1 = new Node(1, "localhost", 100);
private Node node2 = new Node(2, "localhost", 101);
- private Node[] nodes = new Node[] { node0, node1, node2 };
+ private Set nodes = new HashSet(asList(node0, node1, node2));
private String topic = "test";
private List partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes),
new PartitionInfo(topic, 1, node1, nodes, nodes),
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 76a17e8..1d014fc 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -22,8 +22,10 @@ import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
@@ -50,13 +52,15 @@ public class TestUtils {
}
public static Cluster clusterWith(int nodes, String topic, int partitions) {
- Node[] ns = new Node[nodes];
+ Set ns = new HashSet();
for (int i = 0; i < nodes; i++)
- ns[i] = new Node(0, "localhost", 1969);
+ ns.add(new Node(0, "localhost", 1969));
+ Node[] nodeArray = new Node[ns.size()];
+ ns.toArray(nodeArray);
List parts = new ArrayList();
for (int i = 0; i < partitions; i++)
- parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
- return new Cluster(asList(ns), parts);
+ parts.add(new PartitionInfo(topic, i, nodeArray[i % ns.size()], ns, ns));
+ return new Cluster(ns, parts);
}
/**
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerTest.scala
new file mode 100644
index 0000000..3ae1d21
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerTest.scala
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import kafka.server.{KafkaServer, KafkaRequestHandler, KafkaConfig}
+import org.apache.log4j.Logger
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
+import collection._
+import kafka.utils.{Utils, TestUtils}
+import java.util.Properties
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.{Node, PartitionInfo}
+import scala.collection.JavaConversions._
+
+/**
+ * End to end tests of the consumer apis against a local server
+ */
+class ConsumerTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
+
+ private val brokerId1 = 0
+ private val brokerId2 = 1
+ private val ports = TestUtils.choosePorts(2)
+ private val (port1, port2) = (ports(0), ports(1))
+ private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
+ private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
+ private val config1 = new KafkaConfig(props1)
+ private val config2 = new KafkaConfig(props2)
+
+ private val topic = "newtopic"
+ val configs = List(config1, config2)
+ private val nodes = configs.map(config => new Node(config.brokerId, config.hostName, config.port)).toSet
+ private val nodeMap = nodes.map(node => (node.id(), node)).toMap
+
+ private var serverMap: Map[Int, KafkaServer] = new mutable.HashMap[Int, KafkaServer]()
+
+ override def setUp() {
+ super.setUp()
+ serverMap += servers.head.config.brokerId -> servers.head
+ serverMap += servers.last.config.brokerId -> servers.last
+ }
+
+ def testTopicMetadataDiscoveryInConsumer() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+ var leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+ val props = new Properties()
+ props.put("bootstrap.servers", configs.map(config => config.hostName + ":" + config.port).foldLeft("")((url,str) => url + "," + str))
+ props.put("group.id", "test")
+ val leaderNode = nodeMap(leader)
+ import scala.collection.JavaConversions._
+ val partitionInfo = new PartitionInfo(topic, 0, leaderNode, nodes, nodes)
+ val consumer = new KafkaConsumer(props)
+ TestUtils.waitUntilTrue(() => {
+ val returnedPartitionInfo = pollUntilMetadataRefresh(consumer, topic, 1000)
+ println("Received partition info = " + returnedPartitionInfo + " Compared to " + partitionInfo)
+ if(returnedPartitionInfo != null) {
+ returnedPartitionInfo == partitionInfo
+ }
+ else false
+ }, "Topic metadata was not refreshed", 2000)
+ consumer.close()
+ }
+
+ def testMetadataRefreshAfterFailure() {
+ // create topic
+ TestUtils.createTopic(zkClient, topic, 1, 2, servers)
+ var leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
+ val props = new Properties()
+ props.put("bootstrap.servers", configs.map(config => config.hostName + ":" + config.port).foldLeft("")((url,str) => url + "," + str))
+ props.put("group.id", "test")
+ props.put("metadata.max.age.ms", "500")
+ props.put("reconnect.backoff.ms", "500")
+ val leaderNode = nodeMap(leader)
+ val partitionInfo = new PartitionInfo(topic, 0, leaderNode, nodes, nodes)
+ val consumer = new KafkaConsumer(props)
+ TestUtils.waitUntilTrue(() => {
+ val returnedPartitionInfo = pollUntilMetadataRefresh(consumer, topic, 500)
+ if(returnedPartitionInfo != null) {
+ returnedPartitionInfo == partitionInfo
+ }
+ else false
+ }, "Topic metadata was not refreshed", 1000)
+
+ val leaderServer = serverMap(leader)
+ leaderServer.shutdown()
+ leaderServer.awaitShutdown()
+
+ // double check that the leader info has been propagated after consecutive bounces
+ val newLeader = TestUtils.waitUntilMetadataIsPropagated(servers.filterNot(s => s.config.brokerId == leader), topic, 0, 1000)
+ val newLeaderNode = nodeMap(newLeader)
+ val newPartitionInfo = new PartitionInfo(topic, 0, newLeaderNode, nodes, nodes - leaderNode)
+ TestUtils.waitUntilTrue(() => {
+ val returnedPartitionInfo = pollUntilMetadataRefresh(consumer, topic, 500)
+ if(returnedPartitionInfo != null) {
+ returnedPartitionInfo == newPartitionInfo
+ }
+ else false
+ }, "Topic metadata was not refreshed", 2000)
+ consumer.close()
+ }
+
+ private def pollUntilMetadataRefresh(consumer: KafkaConsumer, topic: String, timeout: Int): PartitionInfo = {
+ var remainingTime = timeout
+ while(remainingTime >= 0) {
+ consumer.poll(100)
+ remainingTime -= 100
+ Thread.sleep(100)
+ }
+ val partitionsInfo = consumer.partitionsFor(topic)
+ val partitionInfo = if(partitionsInfo != null) partitionsInfo.get(0) else null
+ partitionInfo
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 9f04bd3..cd6e06a 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -32,6 +32,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionExc
import kafka.utils.{StaticPartitioner, TestUtils, Utils}
import kafka.serializer.StringEncoder
import java.util.Properties
+import TestUtils._
/**
* End to end tests of the primitive apis against a local server
@@ -265,15 +266,4 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
}
}
-
- /**
- * For testing purposes, just create these topics each with one partition and one replica for
- * which the provided broker should the leader for. Create and wait for broker to lead. Simple.
- */
- private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) {
- for( topic <- topics ) {
- AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1)
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0)
- }
- }
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 57b2bd5..e71f912 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -43,6 +43,7 @@ import kafka.producer.ProducerConfig
import junit.framework.AssertionFailedError
import junit.framework.Assert._
import org.apache.kafka.clients.producer.KafkaProducer
+import collection.Iterable
/**
* Utility functions to help with testing
@@ -682,6 +683,17 @@ object TestUtils extends Logging {
def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = {
ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
}
+
+ /**
+ * For testing purposes, just create these topics each with one partition and one replica for
+ * which the provided broker should the leader for. Create and wait for broker to lead. Simple.
+ */
+ def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String]) {
+ for( topic <- topics ) {
+ AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0)
+ }
+ }
}
object TestZKUtils {