Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8135

Kafka Producer deadlocked on flush call with intermittent broker unavailability

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.1.0
    • Fix Version/s: None
    • Component/s: clients
    • Labels:
      None

      Description

      In KIP-91 we added the config delivery.timeout.ms to replace retries, and the value is default to 2 minutes. We've observed that when it was set to MAX_VALUE (e.g. in Kafka Streams, when EOS is turned on), at some times the broker.flush call would be blocked during the time when its destination brokers are undergoing some unavailability:

      java.lang.Thread.State: WAITING (parking)
          at jdk.internal.misc.Unsafe.park(java.base@10.0.2/Native Method)
          - parking to wait for  <0x00000006aeb21a00> (a java.util.concurrent.CountDownLatch$Sync)
          at java.util.concurrent.locks.LockSupport.park(java.base@10.0.2/Unknown Source)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@10.0.2/Unknown Source)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@10.0.2/Unknown Source)
          at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@10.0.2/Unknown Source)
          at java.util.concurrent.CountDownLatch.await(java.base@10.0.2/Unknown Source)
          at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
          at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693)
          at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1066)
          at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:259)
          at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:520)
          at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:470)
          at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:458)
          at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)
          at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)
          at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)
          at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)
          at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
          at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
      

      And even after the broker went back to normal, producers would still be blocked. One suspicion is that when broker's not able to handle the request in time, the responses are dropped somehow inside the Sender, and hence whoever waiting on this response would be blocked forever.

      We've observed such scenarios when 1) broker's transiently failed for a while, 2) network partitioned transiently, and 3) broker's bad config like ACL caused it to not be able to handle requests for a while.

        Attachments

          Activity

            People

            • Assignee:
              rsivaram Rajini Sivaram
              Reporter:
              guozhang Guozhang Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated: