Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-4669

KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

    Details

    • Type: Bug
    • Status: Reopened
    • Priority: Critical
    • Resolution: Unresolved
    • Affects Version/s: 0.9.0.1
    • Fix Version/s: 0.11.0.1, 1.0.0
    • Component/s: clients
    • Labels:

      Description

      There is no try catch in NetworkClient.handleCompletedReceives. If an exception is thrown after inFlightRequests.completeNext(source), then the corresponding RecordBatch's done will never get called, and KafkaProducer.flush will hang on this RecordBatch.

      I've checked 0.10 code and think this bug does exist in 0.10 versions.

      A real case. First a correlateId not match exception happens:
      13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught error in kafka producer I/O thread:
      java.lang.IllegalStateException: Correlation id for response (703766) does not match request (703764)
      at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477)
      at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440)
      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
      at java.lang.Thread.run(Thread.java:745)

      Then jstack shows the thread is hanging on:
      at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
      at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57)
      at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425)
      at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544)
      at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224)
      at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
      at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
      at java.lang.Thread.run(Thread.java:745)

      client code

        Attachments

          Activity

            People

            • Assignee:
              rsivaram Rajini Sivaram
              Reporter:
              wolvever Cheng Ju
            • Votes:
              0 Vote for this issue
              Watchers:
              20 Start watching this issue

              Dates

              • Created:
                Updated: