From f005d86524ea3d68189325e6aff9a799afc99a7e Mon Sep 17 00:00:00 2001
From: Jaikiran Pai <jaikiran.pai@gmail.com>
Date: Mon, 5 Jan 2015 12:23:07 +0530
Subject: [PATCH] KAFKA-1836 Don't block forever if metadata.fetch.timeout.ms
 is set to 0

---
 .../kafka/clients/producer/ProducerConfig.java     |  4 +-
 .../kafka/clients/producer/internals/Metadata.java | 18 +++++-
 .../kafka/clients/producer/MetadataTest.java       | 75 +++++++++++++++++++++-
 3 files changed, 89 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index a893d88..2fd6967 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -49,7 +49,7 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>metadata.fetch.timeout.ms</code> */
     public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
     private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata "
-                                                             + "fetch to succeed before throwing an exception back to the client.";
+                                                             + "fetch to succeed before throwing an exception back to the client. If a value for this configuration is specified, it must be greater than 0.";
 
     /** <code>metadata.max.age.ms</code> */
     public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
@@ -212,7 +212,7 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(METADATA_FETCH_TIMEOUT_CONFIG,
                                         Type.LONG,
                                         60 * 1000,
-                                        atLeast(0),
+                                        atLeast(1),
                                         Importance.LOW,
                                         METADATA_FETCH_TIMEOUT_DOC)
                                 .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 1d30f9e..6459ea1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
  * <p>
  * This class is shared by the client thread (for partitioning) and the background sender thread.
  * 
- * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metdata for a
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
  * topic we don't have any metadata for it will trigger a metadata update.
  */
 public final class Metadata {
@@ -99,10 +99,22 @@ public final class Metadata {
     /**
      * Wait for metadata update until the current version is larger than the last version we know of
      */
-    public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) {
+    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
+        // the current version is already newer than the one that's passed. So just return back
+        // and consider the metadata update has been done.
+        if (this.version > lastVersion) {
+            return;
+        }
+        // The current version isn't newer than the passed version, but the max time to wait for
+        // updates is specified to be <=0, which effectively means that we can't wait for the metadata updates
+        // and neither can we return back normally from this method since that's considered a successful update
+        // of the metadata. Throw an exception instead
+        if (maxWaitMs <= 0) {
+            throw new IllegalArgumentException("Max time to wait for metadata updates should be > 0 milli seconds");
+        }
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
-        while (this.version <= lastVerison) {
+        while (this.version <= lastVersion) {
             try {
                 wait(remainingWaitMs);
             } catch (InterruptedException e) { /* this is fine */
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 4547bfc..a711622 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -12,15 +12,23 @@
  */
 package org.apache.kafka.clients.producer;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
 public class MetadataTest {
 
     private long refreshBackoffMs = 100;
@@ -49,6 +57,46 @@ public class MetadataTest {
         assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
     }
 
+    /**
+     * Tests that {@link org.apache.kafka.clients.producer.internals.Metadata#awaitUpdate(int, long)} doesn't allow a max wait time of <= 0.
+     *
+     * @see https://issues.apache.org/jira/browse/KAFKA-1836
+     * @throws Exception
+     */
+    @Test
+    public void testMetadataUpdateWaitTime() throws Exception {
+        long time = 0;
+        metadata.update(Cluster.empty(), time);
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+        final String topicName = "my-new-topic";
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        try {
+            // first try with an invalid max wait time (of 0)
+            final Future<List<PartitionInfo>> resultOfInvalidWaitTime = executorService.submit(new TopicPartitionFetcher(topicName, 0));
+            try {
+                resultOfInvalidWaitTime.get(2, TimeUnit.SECONDS);
+                // we expected this to fail with an IllegalArgumentException during wait, but it didn't
+                fail("Max wait time of <=0 for metadata updates shouldn't have been considered valid");
+            } catch (ExecutionException ee) {
+                assertTrue("Unexpected execution exception while waiting for metadata update: " + ee.getCause().getMessage(), (ee.getCause() instanceof IllegalArgumentException));
+            }
+            // Now try with a valid timeout
+            // wait for a maximum of 5 seconds for partition information to be updated for the topic, in the metadata
+            final Callable<List<PartitionInfo>> topicPartitionFetcher = new TopicPartitionFetcher(topicName, 5000);
+            // trigger the task for waiting for metadata updates to the topic
+            final Future<List<PartitionInfo>> partitionInfoFuture = executorService.submit(topicPartitionFetcher);
+            // create the topic with the partitions
+            metadata.update(TestUtils.singletonCluster(topicName, 1), time);
+            // get the partitions for the topic
+            final List<PartitionInfo> partitions = partitionInfoFuture.get(10, TimeUnit.SECONDS);
+            assertNotNull("No partition info was available for topic '" + topicName + "'", partitions);
+            assertEquals("Unexpected number of partitions found for topic '" + topicName + "'", 1, partitions.size());
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {
             public void run() {
@@ -65,4 +113,25 @@ public class MetadataTest {
         return thread;
     }
 
+    private class TopicPartitionFetcher implements Callable<List<PartitionInfo>> {
+
+        private final String topicName;
+        private final long waitTimeInMillis;
+
+        TopicPartitionFetcher(final String topicName, final long waitTimeInMillis) {
+            this.topicName = topicName;
+            this.waitTimeInMillis = waitTimeInMillis;
+        }
+
+        @Override
+        public List<PartitionInfo> call() throws Exception {
+            final List<PartitionInfo> partitions = metadata.fetch().partitionsForTopic(topicName);
+            if (partitions == null) {
+                // wait for topic metadata update
+                metadata.awaitUpdate(metadata.requestUpdate(), this.waitTimeInMillis);
+            }
+            // we waited for the max amount of time we were told to, now return back the partitions (if any) for the topic
+            return metadata.fetch().partitionsForTopic(topicName);
+        }
+    }
 }
-- 
1.8.1.2

