Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2387 Improve KafkaConsumer API
  3. KAFKA-2388

subscribe(topic)/unsubscribe(topic) should either take a callback to allow user to handle exceptions or it should be synchronous.

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 0.9.0.0
    • None
    • None

    Description

      According to the mailing list discussion on the consumer interface, we'll replace:

      public void subscribe(String... topics);
      public void subscribe(TopicPartition... partitions);
      public Set<TopicPartition> subscriptions();
      

      with:

      void subscribe(List<String> topics, RebalanceCallback callback);
      void assign(List<TopicPartition> partitions);
      List<String> subscriptions();
      List<TopicPartition> assignments();
      

      We don't need the unsubscribe APIs anymore.

      The RebalanceCallback would look like:

      interface RebalanceCallback {
        void onAssignment(List<TopicPartition> partitions);
        void onRevocation(List<TopicPartition> partitions);
      
        // handle non-existing topics, etc.
        void onError(Exception e);
      }
      

      Attachments

        Activity

          People

            hachikuji Jason Gustafson
            becket_qin Jiangjie Qin
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: