Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-12838

Kafka Broker - Request threads inefficiently blocking during produce

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.7.0, 2.8.0
    • None
    • core
    • None

    Description

      Hello, I have been using Kafka brokers for a bit and have run into a problem with the way a kafka broker handles produce requests. If there are multiple producers to the same topic and partition, any request handler threads handling the produce for that topic and partition become blocked until all requests before it are done. Request handler threads for the entire broker can become exhausted waiting on the same partition lock, blocking requests for other partitions that would not have needed the same lock.

      Once that starts happening, requests start to back up, queued requests can reach its maximum and network threads begin to be paused cascading the problem a bit more. Overall performance ends up being degraded. I'm not so focused on the cascade at the moment as I am the initial contention. Intuitively I would expect locking contention on a single partition to ONLY affect throughput on that partition and not the entire broker.

       

      The append call within the request handler originates here:

      https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/KafkaApis.scala#L638

      Further down the stack the lock during append is created here: https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/log/Log.scala#L1165

      At this point the first request will hold the lock during append and future requests on the same partition will block, waiting for the lock, tying up an io thread (request handler).

      At first glance, it seems like it would make the most sense to (via config?) be able to funnel (produce) requests for the same partition through its own request queue of sorts and dispatch them such that at most one io thread is tied up at a time for a given partition. There are a number of reasons the lock could be held elsewhere too but this should at least help mitigate the issue a bit. I'm assuming this is easier said than done though and likely requires significant refactoring to properly achieve but hoping this is something that could end up on some sort of long term roadmap.

       

      Snippet from jstack. Almost all request handlers threads (there are 256 of them, up from 25 to mitigate the issue) in the jstack are blocked waiting on the same lock due to the number of producers we have.

       

      "data-plane-kafka-request-handler-254" #335 daemon prio=5 os_prio=0 tid=0x00007fb1c9f13000 nid=0x53f1 runnable [0x00007fad35796000]
         java.lang.Thread.State: RUNNABLE
      	at org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:82)
      	at org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.<init>(KafkaLZ4BlockOutputStream.java:125)
      	at org.apache.kafka.common.record.CompressionType$4.wrapForOutput(CompressionType.java:101)
      	at org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:134)
      	at org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:170)
      	at org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:508)
      	at kafka.log.LogValidator$.buildRecordsAndAssignOffsets(LogValidator.scala:500)
      	at kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:455)
      	at kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:106)
      	at kafka.log.Log.$anonfun$append$2(Log.scala:1126)
      	- locked <0x00000004c9a6fd60> (a java.lang.Object)
      	at kafka.log.Log.append(Log.scala:2387)
      	at kafka.log.Log.appendAsLeader(Log.scala:1050)
      	at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
      	at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
      	at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
      	at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown Source)
      	at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
      	at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
      	at scala.collection.mutable.HashMap.map(HashMap.scala:35)
      	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
      	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
      	at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
      	at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
      	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
      	at java.lang.Thread.run(Thread.java:748)
      
      
      "data-plane-kafka-request-handler-253" #334 daemon prio=5 os_prio=0 tid=0x00007fb1c9f11000 nid=0x53f0 waiting for monitor entry [0x00007fad35897000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at kafka.log.Log.$anonfun$append$2(Log.scala:1104)
      	- waiting to lock <0x00000004c9a6fd60> (a java.lang.Object)
      	at kafka.log.Log.append(Log.scala:2387)
      	at kafka.log.Log.appendAsLeader(Log.scala:1050)
      	at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1079)
      	at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1067)
      	at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:953)
      	at kafka.server.ReplicaManager$$Lambda$1078/1017241486.apply(Unknown Source)
      	at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
      	at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
      	at scala.collection.mutable.HashMap.map(HashMap.scala:35)
      	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:941)
      	at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:621)
      	at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:625)
      	at kafka.server.KafkaApis.handle(KafkaApis.scala:137)
      	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
      	at java.lang.Thread.run(Thread.java:748)

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            RyanRCabral Ryan Cabral
            Votes:
            1 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: