Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9048

Improve scalability in number of partitions in replica fetcher

    XMLWordPrintableJSON

Details

    • Task
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • core
    • None

    Description

      https://issues.apache.org/jira/browse/KAFKA-9039 (https://github.com/apache/kafka/pull/7443) improves the performance of the replica fetcher (at both small and large numbers of partitions), but it does not improve its complexity or scalability in the number of partitions.

      I took a profile using async-profiler for the 1000 partition JMH replica fetcher benchmark. The big remaining culprits are:

      • ~18% looking up logStartOffset
      • ~45% FetchSessionHandler$Builder.add
      • ~19% FetchSessionHandler$Builder.build

      Suggestions

      1. The logStartOffset is looked up for every partition on each doWork pass. This requires a hashmap lookup even though the logStartOffset changes rarely. If the replica fetcher could be notified of updates to the logStartOffset, then we could reduce the overhead to a function of the number of updates to the logStartOffset instead of O( n ) on each pass.
      2. The use of FetchSessionHandler means that we maintain a partitionStates hashmap in the replica fetcher, and a sessionPartitions hashmap in the FetchSessionHandler. On each incremental fetch session pass, we need to reconcile these two hashmaps to determine which partitions were added/updated and which partitions were removed. This reconciliation process is especially expensive, requiring multiple passes over the fetching partitions, and hashmap remove and puts for most partitions. The replica fetcher could be smarter by maintaining the fetch session updated hashmap containing FetchRequest.PartitionData(s) directly, as well as removed partitions list so that these do not need to be generated by reconciled on each fetch pass.
      3. maybeTruncate requires an O( n ) pass over the elements in partitionStates even if there are no partitions in truncating state. If we can maintain some additional state about whether truncating partitions exist in partitionStates, or if we could separate these states into a separate data structure, we would not need to iterate across all partitions on every doWork pass. I’ve seen clusters where this work takes about 0.5%-1% of CPU, which is minor but will become more substantial as the number of partitions increases.

      If we can achieve 1 and 2, the complexity will be improved from a function of the number of partitions to the the number of partitions with updated fetch offsets/log start offsets between each fetch. In general, a minority of partitions will have changes in these between fetches, so this should improve the average case complexity greatly.

      Attachments

        Activity

          People

            guozhang Guozhang Wang
            lucasbradstreet Lucas Bradstreet
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: