diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d85ca30..687eac9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -119,7 +120,7 @@ public class KafkaProducer implements Producer { metrics, time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), 0); + this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); NetworkClient client = new NetworkClient(new Selector(this.metrics, time), this.metadata, @@ -225,8 +226,9 @@ public class KafkaProducer implements Producer { @Override public Future send(ProducerRecord record, Callback callback) { try { - Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); - int partition = partitioner.partition(record, cluster); + // first make sure the metadata for the topic is available + waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); + int partition = partitioner.partition(record, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value()); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); @@ -256,6 +258,28 @@ public class KafkaProducer implements Producer { } /** + * Wait for cluster metadata including partitions for the given topic to be available. + * @param topic The topic we want metadata for + * @param maxWaitMs The maximum amount of time to block waiting for metadata + */ + private void waitOnMetadata(String topic, long maxWaitMs) { + long begin = System.currentTimeMillis(); + long remainingWaitMs = maxWaitMs; + while (!metadata.checkForTopic(topic)) { + metadata.add(topic); + metadata.forceUpdate(); + sender.wakeup(); + log.trace("Requesting metadata update for topic {}.", topic); + if (metadata.waitForTopic(topic, remainingWaitMs)) + return; + long elapsed = System.currentTimeMillis() - begin; + if (elapsed >= maxWaitMs) + throw new TimeoutException("Failed to update metadata after " + this.metadataFetchTimeoutMs + " ms."); + remainingWaitMs = maxWaitMs - elapsed; + } + } + + /** * Validate that the record size isn't too large */ private void ensureValidRecordSize(int size) { @@ -271,8 +295,10 @@ public class KafkaProducer implements Producer { " configuration."); } + @Override public List partitionsFor(String topic) { - return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic); + waitOnMetadata(topic, this.metadataFetchTimeoutMs); + return this.metadata.fetch().partitionsForTopic(topic); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 8890aa2..367ff8c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -13,12 +13,9 @@ package org.apache.kafka.clients.producer.internals; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,33 +68,34 @@ public final class Metadata { } /** - * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic, - * block waiting for an update. - * @param topic The topic we want metadata for - * @param maxWaitMs The maximum amount of time to block waiting for metadata + * Check immediately if the metadata for a specific topic is available */ - public synchronized Cluster fetch(String topic, long maxWaitMs) { - List partitions = null; - long begin = System.currentTimeMillis(); - long remainingWaitMs = maxWaitMs; - do { - partitions = cluster.partitionsForTopic(topic); - if (partitions == null) { - topics.add(topic); - forceUpdate = true; - try { - log.trace("Requesting metadata update for topic {}.", topic); - wait(remainingWaitMs); - } catch (InterruptedException e) { /* this is fine, just try again */ - } - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) - throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - remainingWaitMs = maxWaitMs - elapsed; - } else { - return cluster; + public synchronized boolean checkForTopic(String topic) { + return cluster.partitionsForTopic(topic) != null; + } + + /** + * Check if the metadata for a specific topic is available, if not + * wait for the next refresh or time has elapsed then re-check again + */ + public synchronized boolean waitForTopic(String topic, long maxWaitMs) { + if (checkForTopic(topic)) { + return true; + } else { + try { + log.trace("Start waiting for topic {}...", topic); + wait(maxWaitMs); + } catch (InterruptedException e) { /* this is fine */ } - } while (true); + return checkForTopic(topic); + } + } + + /** + * Add the topic to maintain in the metadata + */ + public synchronized void add(String topic) { + topics.add(topic); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 0d7d04c..dd33615 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -51,7 +51,7 @@ public class MetadataTest { private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { - metadata.fetch(topic, Integer.MAX_VALUE); + while (!metadata.waitForTopic(topic, Long.MAX_VALUE)); } }; thread.start();