Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4669

KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Reopened
    • Critical
    • Resolution: Unresolved
    • 0.9.0.1
    • 0.11.0.1, 1.0.0
    • clients

    Description

      There is no try catch in NetworkClient.handleCompletedReceives. If an exception is thrown after inFlightRequests.completeNext(source), then the corresponding RecordBatch's done will never get called, and KafkaProducer.flush will hang on this RecordBatch.

      I've checked 0.10 code and think this bug does exist in 0.10 versions.

      A real case. First a correlateId not match exception happens:
      13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught error in kafka producer I/O thread:
      java.lang.IllegalStateException: Correlation id for response (703766) does not match request (703764)
      at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
      at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
      at java.lang.Thread.run(Thread.java:745)

      Then jstack shows the thread is hanging on:
      at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
      at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
      at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
      at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
      at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
      at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
      at java.lang.Thread.run(Thread.java:745)

      client code

      Attachments

        Issue Links

          Activity

            People

              rsivaram Rajini Sivaram
              wolvever Cheng Ju
              Votes:
              0 Vote for this issue
              Watchers:
              21 Start watching this issue

              Dates

                Created:
                Updated: