commit 6914016e222a06beec1baa62c7f295919571384e
Author: Jon Lee <jonlee@linkedin.com>
Date:   Wed Jul 11 20:30:24 2018 -0700

    LIKAFKA-18035: Force consumers with pattern subscription to refresh metadata when starting rebalance
    
    RB=1357458
    BUG=LIKAFKA-18035
    G=Kafka-Code-Reviews
    R=dolin
    A=dolin

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index fdf269788..5767c02e8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -126,14 +126,20 @@ public final class Metadata {
     }
 
     /**
+     * Return the next time when the current cluster info can be updated (i.e., backoff time has elapsed).
+     */
+    public synchronized long timeToAllowUpdate(long nowMs) {
+        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+    }
+
+    /**
      * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
      * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
      * is now
      */
     public synchronized long timeToNextUpdate(long nowMs) {
         long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
-        return Math.max(timeToExpire, timeToAllowUpdate);
+        return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index acd5448cc..4371e539a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -298,8 +298,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 // due to a race condition between the initial metadata fetch and the initial rebalance,
                 // we need to ensure that the metadata is fresh before joining initially. This ensures
                 // that we have matched the pattern against the cluster's topics at least once before joining.
-                if (subscriptions.hasPatternSubscription())
+                if (subscriptions.hasPatternSubscription()) {
+                    // Force metadata refresh, as long as the refresh backoff time has passed. This is to avoid a
+                    // situation where this consumer's metadata refresh happens after the current rebalance is done and
+                    // causes another round of rebalance.
+                    if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
+                        this.metadata.requestUpdate();
+                    }
                     client.ensureFreshMetadata();
+                }
 
                 ensureActiveGroup();
                 now = time.milliseconds();
