diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d21f922..f739279 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -309,7 +309,7 @@ public class NetworkClient implements KafkaClient { } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead if (this.selector.disconnected().size() > 0) - this.metadata.forceUpdate(); + this.metadata.requestUpdate(); } /** @@ -375,7 +375,7 @@ public class NetworkClient implements KafkaClient { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ - metadata.forceUpdate(); + metadata.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d85ca30..f58b850 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -73,6 +74,7 @@ public class KafkaProducer implements Producer { private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; + private final Time time; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -94,7 +96,7 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config) { log.trace("Starting the Kafka producer"); - Time time = new SystemTime(); + this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); @@ -119,7 +121,7 @@ public class KafkaProducer implements Producer { metrics, time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), 0); + this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); NetworkClient client = new NetworkClient(new Selector(this.metrics, time), this.metadata, @@ -225,8 +227,9 @@ public class KafkaProducer implements Producer { @Override public Future send(ProducerRecord record, Callback callback) { try { - Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); - int partition = partitioner.partition(record, cluster); + // first make sure the metadata for the topic is available + waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); + int partition = partitioner.partition(record, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value()); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); @@ -256,6 +259,31 @@ public class KafkaProducer implements Producer { } /** + * Wait for cluster metadata including partitions for the given topic to be available. + * @param topic The topic we want metadata for + * @param maxWaitMs The maximum time in ms for waiting on the metadata + */ + private void waitOnMetadata(String topic, long maxWaitMs) { + if (metadata.fetch().partitionsForTopic(topic) != null) { + return; + } else { + long begin = time.milliseconds(); + long remainingWaitMs = maxWaitMs; + while (metadata.fetch().partitionsForTopic(topic) == null) { + log.trace("Requesting metadata update for topic {}.", topic); + int version = metadata.requestUpdate(); + metadata.add(topic); + sender.wakeup(); + metadata.awaitUpdate(version, remainingWaitMs); + long elapsed = time.milliseconds() - begin; + if (elapsed >= maxWaitMs) + throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); + remainingWaitMs = maxWaitMs - elapsed; + } + } + } + + /** * Validate that the record size isn't too large */ private void ensureValidRecordSize(int size) { @@ -271,8 +299,10 @@ public class KafkaProducer implements Producer { " configuration."); } + @Override public List partitionsFor(String topic) { - return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic); + waitOnMetadata(topic, this.metadataFetchTimeoutMs); + return this.metadata.fetch().partitionsForTopic(topic); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 8890aa2..140237f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -13,11 +13,9 @@ package org.apache.kafka.clients.producer.internals; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +34,10 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; + private int version; private long lastRefreshMs; private Cluster cluster; - private boolean forceUpdate; + private boolean needUpdate; private final Set topics; /** @@ -58,8 +57,9 @@ public final class Metadata { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; this.lastRefreshMs = 0L; + this.version = 0; this.cluster = Cluster.empty(); - this.forceUpdate = false; + this.needUpdate = false; this.topics = new HashSet(); } @@ -71,33 +71,10 @@ public final class Metadata { } /** - * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic, - * block waiting for an update. - * @param topic The topic we want metadata for - * @param maxWaitMs The maximum amount of time to block waiting for metadata + * Add the topic to maintain in the metadata */ - public synchronized Cluster fetch(String topic, long maxWaitMs) { - List partitions = null; - long begin = System.currentTimeMillis(); - long remainingWaitMs = maxWaitMs; - do { - partitions = cluster.partitionsForTopic(topic); - if (partitions == null) { - topics.add(topic); - forceUpdate = true; - try { - log.trace("Requesting metadata update for topic {}.", topic); - wait(remainingWaitMs); - } catch (InterruptedException e) { /* this is fine, just try again */ - } - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) - throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - remainingWaitMs = maxWaitMs - elapsed; - } else { - return cluster; - } - } while (true); + public synchronized void add(String topic) { + topics.add(topic); } /** @@ -106,16 +83,35 @@ public final class Metadata { * been request then the expiry time is now */ public synchronized long timeToNextUpdate(long nowMs) { - long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); + long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } /** - * Force an update of the current cluster info + * Request an update of the current cluster metadata info, return the current version before the update */ - public synchronized void forceUpdate() { - this.forceUpdate = true; + public synchronized int requestUpdate() { + this.needUpdate = true; + return this.version; + } + + /** + * Wait for metadata update until the current version is larger than the last version we know of + */ + public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) { + long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; + while (this.version <= lastVerison) { + try { + wait(remainingWaitMs); + } catch (InterruptedException e) { /* this is fine */ + } + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) + throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); + remainingWaitMs = maxWaitMs - elapsed; + } } /** @@ -129,8 +125,9 @@ public final class Metadata { * Update the cluster metadata */ public synchronized void update(Cluster cluster, long now) { - this.forceUpdate = false; + this.needUpdate = false; this.lastRefreshMs = now; + this.version += 1; this.cluster = cluster; notifyAll(); log.debug("Updated cluster metadata to {}", cluster); @@ -142,5 +139,4 @@ public final class Metadata { public synchronized long lastUpdate() { return this.lastRefreshMs; } - } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 37b9d1a..a016269 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -147,7 +147,7 @@ public class Sender implements Runnable { // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) - this.metadata.forceUpdate(); + this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator iter = result.readyNodes.iterator(); @@ -252,7 +252,7 @@ public class Sender implements Runnable { this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } if (error.exception() instanceof InvalidMetadataException) - metadata.forceUpdate(); + metadata.requestUpdate(); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 0d7d04c..543304c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -31,7 +31,7 @@ public class MetadataTest { long time = 0; metadata.update(Cluster.empty(), time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); - metadata.forceUpdate(); + metadata.requestUpdate(); assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); time += refreshBackoffMs; assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0); @@ -40,7 +40,9 @@ public class MetadataTest { Thread t2 = asyncFetch(topic); assertTrue("Awaiting update", t1.isAlive()); assertTrue("Awaiting update", t2.isAlive()); - metadata.update(TestUtils.singletonCluster(topic, 1), time); + // keep updating the metadata until no need to + while (metadata.timeToNextUpdate(time) == 0) + metadata.update(TestUtils.singletonCluster(topic, 1), time); t1.join(); t2.join(); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); @@ -51,7 +53,8 @@ public class MetadataTest { private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { - metadata.fetch(topic, Integer.MAX_VALUE); + while (metadata.fetch().partitionsForTopic(topic) == null) + metadata.awaitUpdate(metadata.requestUpdate(), Long.MAX_VALUE); } }; thread.start();