Description
The bellow code snippet demonstrated the problem.
Basically, the unsubscribe() call leaves the KafkaConsumer in a state that means poll() will always return empty record sets, even if new topic-partitions have been assigned that have messages pending. This is because unsubscribe() sets SubscriptionState.needsPartitionAssignment to true, and assign() does not clear this flag. The only thing that clears this flag is when the consumer handles the response from a JoinGroup request.
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1))); ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, returning records consumer.unsubscribe(); // Puts consumer into invalid state. consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2))); records = consumer.poll(100);// <- Always returns empty record set.