diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d21f922..f739279 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -309,7 +309,7 @@ public class NetworkClient implements KafkaClient { } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead if (this.selector.disconnected().size() > 0) - this.metadata.forceUpdate(); + this.metadata.requestUpdate(); } /** @@ -375,7 +375,7 @@ public class NetworkClient implements KafkaClient { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(node.id()); /* maybe the problem is our metadata, update it */ - metadata.forceUpdate(); + metadata.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d85ca30..44f6c81 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -73,6 +74,7 @@ public class KafkaProducer implements Producer { private final Thread ioThread; private final CompressionType compressionType; private final Sensor errors; + private final Time time; /** * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings @@ -94,7 +96,7 @@ public class KafkaProducer implements Producer { private KafkaProducer(ProducerConfig config) { log.trace("Starting the Kafka producer"); - Time time = new SystemTime(); + this.time = new SystemTime(); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); @@ -119,7 +121,7 @@ public class KafkaProducer implements Producer { metrics, time); List addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), 0); + this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); NetworkClient client = new NetworkClient(new Selector(this.metrics, time), this.metadata, @@ -225,8 +227,9 @@ public class KafkaProducer implements Producer { @Override public Future send(ProducerRecord record, Callback callback) { try { - Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs); - int partition = partitioner.partition(record, cluster); + // first make sure the metadata for the topic is available + waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs); + int partition = partitioner.partition(record, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value()); ensureValidRecordSize(serializedSize); TopicPartition tp = new TopicPartition(record.topic(), partition); @@ -256,6 +259,27 @@ public class KafkaProducer implements Producer { } /** + * Wait for cluster metadata including partitions for the given topic to be available. + * @param topic The topic we want metadata for + * @param maxWaitMs The maximum time in ms for waiting on the metadata + */ + private void waitOnMetadata(String topic, long maxWaitMs) { + if (metadata.checkForTopic(topic)) { + return; + } else { + long untilMs = time.milliseconds() + maxWaitMs; + while (!metadata.checkForTopic(topic)) { + log.trace("Requesting metadata update for topic {}.", topic); + metadata.add(topic); + sender.wakeup(); + metadata.forceUpdate(untilMs); + if (untilMs < time.milliseconds()) + throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); + } + } + } + + /** * Validate that the record size isn't too large */ private void ensureValidRecordSize(int size) { @@ -271,8 +295,10 @@ public class KafkaProducer implements Producer { " configuration."); } + @Override public List partitionsFor(String topic) { - return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic); + waitOnMetadata(topic, this.metadataFetchTimeoutMs); + return this.metadata.fetch().partitionsForTopic(topic); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 8890aa2..ea87d3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -13,12 +13,9 @@ package org.apache.kafka.clients.producer.internals; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.errors.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +35,7 @@ public final class Metadata { private final long metadataExpireMs; private long lastRefreshMs; private Cluster cluster; - private boolean forceUpdate; + private boolean needUpdate; private final Set topics; /** @@ -59,7 +56,7 @@ public final class Metadata { this.metadataExpireMs = metadataExpireMs; this.lastRefreshMs = 0L; this.cluster = Cluster.empty(); - this.forceUpdate = false; + this.needUpdate = false; this.topics = new HashSet(); } @@ -71,33 +68,17 @@ public final class Metadata { } /** - * Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic, - * block waiting for an update. - * @param topic The topic we want metadata for - * @param maxWaitMs The maximum amount of time to block waiting for metadata + * Check immediately if the metadata for a specific topic is available */ - public synchronized Cluster fetch(String topic, long maxWaitMs) { - List partitions = null; - long begin = System.currentTimeMillis(); - long remainingWaitMs = maxWaitMs; - do { - partitions = cluster.partitionsForTopic(topic); - if (partitions == null) { - topics.add(topic); - forceUpdate = true; - try { - log.trace("Requesting metadata update for topic {}.", topic); - wait(remainingWaitMs); - } catch (InterruptedException e) { /* this is fine, just try again */ - } - long elapsed = System.currentTimeMillis() - begin; - if (elapsed >= maxWaitMs) - throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); - remainingWaitMs = maxWaitMs - elapsed; - } else { - return cluster; - } - } while (true); + public synchronized boolean checkForTopic(String topic) { + return cluster.partitionsForTopic(topic) != null; + } + + /** + * Add the topic to maintain in the metadata + */ + public synchronized void add(String topic) { + topics.add(topic); } /** @@ -106,16 +87,32 @@ public final class Metadata { * been request then the expiry time is now */ public synchronized long timeToNextUpdate(long nowMs) { - long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); + long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0); long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } /** - * Force an update of the current cluster info + * Request an update of the current cluster metadata info + */ + public synchronized void requestUpdate() { + this.needUpdate = true; + } + + /** + * Force an update of the current cluster info, block wait until it is done */ - public synchronized void forceUpdate() { - this.forceUpdate = true; + public synchronized void forceUpdate(long waitUntilMs) { + if (this.lastRefreshMs > waitUntilMs) + return; + + this.needUpdate = true; + long maxWaitMs = waitUntilMs - this.lastRefreshMs; + + try { + wait(maxWaitMs); + } catch (InterruptedException e) { /* this is fine */ + } } /** @@ -129,7 +126,7 @@ public final class Metadata { * Update the cluster metadata */ public synchronized void update(Cluster cluster, long now) { - this.forceUpdate = false; + this.needUpdate = false; this.lastRefreshMs = now; this.cluster = cluster; notifyAll(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 37b9d1a..a016269 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -147,7 +147,7 @@ public class Sender implements Runnable { // if there are any partitions whose leaders are not known yet, force metadata update if (result.unknownLeadersExist) - this.metadata.forceUpdate(); + this.metadata.requestUpdate(); // remove any nodes we aren't ready to send to Iterator iter = result.readyNodes.iterator(); @@ -252,7 +252,7 @@ public class Sender implements Runnable { this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount); } if (error.exception() instanceof InvalidMetadataException) - metadata.forceUpdate(); + metadata.requestUpdate(); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java index 0d7d04c..99cbf91 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java @@ -31,7 +31,7 @@ public class MetadataTest { long time = 0; metadata.update(Cluster.empty(), time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); - metadata.forceUpdate(); + metadata.requestUpdate(); assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); time += refreshBackoffMs; assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0); @@ -40,7 +40,9 @@ public class MetadataTest { Thread t2 = asyncFetch(topic); assertTrue("Awaiting update", t1.isAlive()); assertTrue("Awaiting update", t2.isAlive()); - metadata.update(TestUtils.singletonCluster(topic, 1), time); + // keep updating the metadata until no need to + while (metadata.timeToNextUpdate(time) == 0) + metadata.update(TestUtils.singletonCluster(topic, 1), time); t1.join(); t2.join(); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); @@ -51,7 +53,8 @@ public class MetadataTest { private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { - metadata.fetch(topic, Integer.MAX_VALUE); + while (!metadata.checkForTopic(topic)) + metadata.forceUpdate(Long.MAX_VALUE); } }; thread.start();