Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2686

unsubscribe() call leaves KafkaConsumer in invalid state for manual topic-partition assignment

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.9.0.0
    • 0.9.0.0
    • consumer
    • None

    Description

      The bellow code snippet demonstrated the problem.

      Basically, the unsubscribe() call leaves the KafkaConsumer in a state that means poll() will always return empty record sets, even if new topic-partitions have been assigned that have messages pending. This is because unsubscribe() sets SubscriptionState.needsPartitionAssignment to true, and assign() does not clear this flag. The only thing that clears this flag is when the consumer handles the response from a JoinGroup request.

      final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      
      consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
      ConsumerRecords<String, String> records = consumer.poll(100);// <- Works, returning records
      
      consumer.unsubscribe();   // Puts consumer into invalid state.
      
      consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
      records = consumer.poll(100);// <- Always returns empty record set.
      

      Attachments

        Activity

          People

            guozhang Guozhang Wang
            BigAndy Andy Coates
            Jason Gustafson Jason Gustafson
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: