Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9266

KafkaConsumer manual assignment does not reset group assignment

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.0
    • 2.3.1
    • clients
    • None

    Description

      When using the manual assignment API, SubscriptionState still remembers group subscriptions in its groupSubscription member of topics to which it is no longer subscribed.

      See the following code which shows the unexpected behavior:

          TopicPartition tp1 = new TopicPartition("a", 0);
          TopicPartition tp2 = new TopicPartition("b", 0);
          LogContext logContext = new LogContext();
          SubscriptionState state = new SubscriptionState(logContext, OffsetResetStrategy.NONE);
          state.assignFromUser(ImmutableSet.of(tp1, tp2));
          state.unsubscribe();
          state.assignFromUser(ImmutableSet.of(tp1));
          assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Succeeds
          
          state.assignFromUser(ImmutableSet.of(tp1, tp2));
          state.assignFromUser(ImmutableSet.of(tp1));
          assertEquals(ImmutableSet.of("a"), state.groupSubscription()); // Fails: Expected [a] but was [a, b]
      

      The problem seems to be that within SubscriptionState.changeSubscription() the groupSubscription only grows and is never trimmed if the assignment is manual:

          private boolean changeSubscription(Set<String> topicsToSubscribe) {
              ...
              groupSubscription = new HashSet<>(groupSubscription);
              groupSubscription.addAll(topicsToSubscribe);
              ....
          }
      

      This behavior in turn leads to METADATA requests by the client with partitions which are actually no longer assigned:

      KafkaConsumer consumer;
      consumer.assign(ImmutableList.of(topicPartition1, topicPartition2));
      consumer.poll(); // This will cause a MetadataRequest to be sent to the broker with topic1 and topic2
      consumer.assign(ImmutableList.of(topicPartition1));
      consumer.poll(); // This will AGAIN cause a MetadataRequest for topic1 and topic2 instead of only topic1
      

      And this in turn causes the deletion of the topicPartion2 to fail. The workaround is to do a consumer.unassign(); before the second consumer.assign();

      Attachments

        Activity

          People

            Unassigned Unassigned
            ggs G G
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: