Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14029

Consumer response serialization could block other response handlers at scale

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 3.2.0
    • None
    • consumer
    • None

    Description

      Hi,

      We have been using our in-house tools to test Kafka's scalability to have an idea of how a large-scale deployment will work and where are the bottlenecks. For now, we are looking at version 3.2 and focused in a many-consumers scenario.

      Consumer-wise, we want to report a possible issue and eventually propose a solution, aiming to build our expertise in the system. When adding a new consumer to a group, the code path

      org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle // (has a synchronized block)
       org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onLeaderElected
         org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected
          org.apache.kafka.clients.consumer.internals.ConsumerProtocol.serializeAssignment
           org.apache.kafka.clients.consumer.internals.ConsumerProtocol.serializeAssignment
            org.apache.kafka.common.protocol.MessageUtil.toVersionPrefixedByteBuffer // linear on size of message

      could end up being costly when the size of the message is large. toVersionPrefixedByteBuffer seems to be linear in the size of the message, and albeit writing an array in linear time is not unreasonable at all, under certain conditions, e.g. when under locks, it can cause undesired contention. In this case, its invoked to serialize the assigment when adding a new consumer (on where there is another loop wraping up this path that seems to depend on the number of assignments, which could be another problematic dimension if growing causing undesired nesting), here

      [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeaderElected]

      // ...

      Map<String, ByteBuffer> groupAssignment = new HashMap<>();
              for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet())

      {             ByteBuffer buffer = *ConsumerProtocol.serializeAssignment(assignmentEntry.getValue()); // calls toVersionPrefixedByteBuffer*              groupAssignment.put(assignmentEntry.getKey(), buffer);         }

      // ...

      The question here is, is there a need to serialize the assignment inside the synchronized block? if that assignment is too large, it could easily add a few seconds to the request and block others that use the same lock, like HeartbeatResponseHandler.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            ucarebugfinder BugFinder
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: