Description
As can be maybeValidatePositionForCurrentLeader doesn't check if partition is subscribed. It can be done by checking TopicPartitionState cached is null or not, as done by maybeCompleteValidation. So it throws IllegalStateException for a partition that is yet not subscribed.
Lack of this check makes writing thread-safe code w.r.t SubscriptionState class awkward. This can be seen from the example code below. For example, at line 1 partitionA would be in allCurrentlySubscribedTopics, but at line 2 it could be removed from subscribed partitions(in a separate thread). So this forces the user of this class to handle IllegalStateException which is awkward.
// Following is example code for the user of SubscriptionState::maybeValidatePositionForCurrentLeader Set<TopicPartition> allCurrentlySubscribedTopics = subscriptionState.assignedPartitions(); // line 1 if(allCurrentlySubscribedTopics.contains(tp)) { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp); try() { subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, tp, leaderAndEpoch); // line 2 } catch (IllegalStateException e) { // recover from it. // line 3 } }
Attachments
Issue Links
- Dependency
-
KAFKA-15627 KIP-951, Java client changes to incorporate the leader discovery optimisations
- Resolved
-
KAFKA-15970 KIP-951, port newly added tests in FetcherTest.java to FetchRequestManagerTest.ajva
- Resolved
- is a child of
-
KAFKA-15868 KIP-951 - Leader discovery optimisations for the client
- Closed
- is a clone of
-
KAFKA-15415 In Java-client, backoff should be skipped for retried producer-batch to a new leader
- Resolved
- links to