Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-15824

SubscriptionState's maybeValidatePositionForCurrentLeader should handle partition which isn't subscribed yet

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.7.0
    • clients
    • None

    Description

      As can be maybeValidatePositionForCurrentLeader doesn't check if partition is subscribed. It can be done by checking TopicPartitionState cached is null or not, as done by maybeCompleteValidation. So it throws IllegalStateException for a partition that is yet not subscribed.

      Lack of this check makes writing thread-safe code w.r.t SubscriptionState class awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward.

      // Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader
      
      Set<TopicPartition> allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1
      if(allCurrentlySubscribedTopics.contains(tp)) {
           ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp);
        try() {
          subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2
        } catch (IllegalStateException e) {
         // recover from it. // line 3
        }
      }

       

      Attachments

        Issue Links

          Activity

            People

              mayanksnarula Mayank Shekhar Narula
              mayanksnarula Mayank Shekhar Narula
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: