Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7096

Consumer should drop the data for unassigned topic partitions

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.2.0
    • Component/s: None
    • Labels:
      None

      Description

      currently if a client has assigned topics : T1, T2, T3 and calls poll(), the poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the client unassigns some topics (for example T3) and calls poll() we still hold the data (for T3) in the completedFetches queue until we actually reach the buffered data for the unassigned Topics (T3 in our example) on subsequent poll() calls, at which point we drop that data. This process of holding the data is unnecessary.

      When a client creates a topic, it takes time for the broker to fetch ACLs for the topic. But during this time, the client will issue fetchRequest for the topic, it will get response for the partitions of this topic. The response consist of TopicAuthorizationException for each of the partitions. This response for each partition is wrapped with a completedFetch and added to the completedFetches queue. Now when the client calls the next poll() it sees the TopicAuthorizationException from the first buffered CompletedFetch. At this point the client chooses to sleep for 1.5 min as a backoff (as per the design), hoping that the Broker fetches the ACL from ACL store in the meantime. Actually the Broker has already fetched the ACL by this time. When the client calls poll() after the sleep, it again sees the TopicAuthorizationException from the second completedFetch and it sleeps again. So it takes (1.5 * 60 * partitions) seconds before the client can see any data. With this patch, the client when it sees the first TopicAuthorizationException, it can all assign(EmptySet), which will get rid of the buffered completedFetches (those with TopicAuthorizationException) and it can again call assign(TopicPartitions) before calling poll(). With this patch we found that client was able to get the records as soon as the Broker fetched the ACLs from ACL store.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                mgharat Mayuresh Gharat
                Reporter:
                mgharat Mayuresh Gharat
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: