diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index b72769e..9ba4e5c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -387,6 +387,7 @@ public abstract class AbstractCoordinator implements Closeable { synchronized (AbstractCoordinator.this) { log.info("Successfully joined group {} with generation {}", groupId, generation.generationId); state = MemberState.STABLE; + AbstractCoordinator.this.rejoinNeeded = false; if (heartbeatThread != null) heartbeatThread.enable(); @@ -446,7 +447,7 @@ public abstract class AbstractCoordinator implements Closeable { } else { AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol()); - AbstractCoordinator.this.rejoinNeeded = false; + //AbstractCoordinator.this.rejoinNeeded = false; if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else {