From f391f05723255989912dec35cf840308c3a7ff44 Mon Sep 17 00:00:00 2001
From: Jaikiran Pai <jaikiran.pai@gmail.com>
Date: Mon, 5 Jan 2015 12:23:07 +0530
Subject: [PATCH] KAFKA-1836 Don't block forever if metadata.fetch.timeout.ms
 is set to 0

---
 .../kafka/clients/producer/internals/Metadata.java | 13 ++++---
 .../kafka/clients/producer/MetadataTest.java       | 41 ++++++++++++++++++----
 2 files changed, 43 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index 1d30f9e..9f8aaa6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
  * <p>
  * This class is shared by the client thread (for partitioning) and the background sender thread.
  * 
- * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metdata for a
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
  * topic we don't have any metadata for it will trigger a metadata update.
  */
 public final class Metadata {
@@ -99,12 +99,17 @@ public final class Metadata {
     /**
      * Wait for metadata update until the current version is larger than the last version we know of
      */
-    public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) {
+    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
+        if (maxWaitMs < 0) {
+            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
+        }
         long begin = System.currentTimeMillis();
         long remainingWaitMs = maxWaitMs;
-        while (this.version <= lastVerison) {
+        while (this.version <= lastVersion) {
             try {
-                wait(remainingWaitMs);
+                if (maxWaitMs != 0) {
+                    wait(remainingWaitMs);
+                }
             } catch (InterruptedException e) { /* this is fine */
             }
             long elapsed = System.currentTimeMillis() - begin;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 4547bfc..74605c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -3,24 +3,23 @@
  * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class MetadataTest {
 
     private long refreshBackoffMs = 100;
@@ -49,13 +48,42 @@ public class MetadataTest {
         assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
     }
 
+    /**
+     * Tests that {@link org.apache.kafka.clients.producer.internals.Metadata#awaitUpdate(int, long)} doesn't
+     * wait forever with a max timeout value of 0
+     *
+     * @throws Exception
+     * @see https://issues.apache.org/jira/browse/KAFKA-1836
+     */
+    @Test
+    public void testMetadataUpdateWaitTime() throws Exception {
+        long time = 0;
+        metadata.update(Cluster.empty(), time);
+        assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+        // first try with a max wait time of 0 and ensure that this returns back without waiting forever
+        try {
+            metadata.awaitUpdate(metadata.requestUpdate(), 0);
+            fail("Wait on metadata update was expected to timeout, but it didn't");
+        } catch (TimeoutException te) {
+            // expected
+        }
+        // now try with a higher timeout value once
+        final long TWO_SECOND_WAIT = 2000;
+        try {
+            metadata.awaitUpdate(metadata.requestUpdate(), TWO_SECOND_WAIT);
+            fail("Wait on metadata update was expected to timeout, but it didn't");
+        } catch (TimeoutException te) {
+            // expected
+        }
+    }
+
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {
             public void run() {
                 while (metadata.fetch().partitionsForTopic(topic) == null) {
                     try {
                         metadata.awaitUpdate(metadata.requestUpdate(), refreshBackoffMs);
-                    } catch(TimeoutException e) {
+                    } catch (TimeoutException e) {
                         // let it go
                     }
                 }
@@ -64,5 +92,4 @@ public class MetadataTest {
         thread.start();
         return thread;
     }
-
 }
-- 
1.8.1.2

