Index: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (revision 1874f2388cffa7a1e866cbe4aff8b92c9d953b41) +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (revision 225eab98d939beb5ce797c9f50af22fc95fb3445) @@ -1231,8 +1231,11 @@ client.maybeTriggerWakeup(); if (includeMetadataInTimeout) { - // try to update assignment metadata BUT do not need to block on the timer for join group - updateAssignmentMetadataIfNeeded(timer, false); + // try to update assignment metadata BUT restrict blocking for join group to the poll timeout + if (!updateAssignmentMetadataIfNeeded(timer, true)) { + // Assignment update is still in progress - do not return any records as they may become stale when update is completed + return ConsumerRecords.empty(); + } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) { log.warn("Still waiting for metadata");