Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6876

Sender exceptions ignored by WorkerSourceTask producer Callback causing data loss

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 0.11.0.1, 1.1.0, 2.0.1
    • None
    • connect
    • None
    • Linux, JDK 8

    Description

      The producer callback in "WorkerSourceTask" handles exceptions during a send() by logging at ERROR level and continuing.  This can lead to offsets being committed for records that were never sent correctly.  The records are effectively skipped, leading to data loss in our use case.   

      The source code for the Callback "onCompletion()" method suggests this should "basically never happen ... callbacks with exceptions should never be invoked in practice", but we have seen this happen several times in production, especially in near heap-exhaustion situations when the Sender thread generates an exception (often caused by KAFKA-6551).

      From WorkerSourceTask line 253:

      new Callback() {
         @Override
         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
             if (e != null) {
                 // Given the default settings for zero data loss, this should basically never happen --
                 // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request
                 // timeouts, callbacks with exceptions should never be invoked in practice. If the
                 // user overrode these settings, the best we can do is notify them of the failure via
                 // logging.
                 log.error("{} failed to send record to {}: {}", this, topic, e);
                 log.debug("{} Failed record: {}", this, preTransformRecord);
             } else {
                log.trace("{} Wrote record successfully: topic {} partition {} offset {}",
                         this,
                         recordMetadata.topic(), recordMetadata.partition(),
                         recordMetadata.offset());
                 commitTaskRecord(preTransformRecord);
             }
             recordSent(producerRecord);
             counter.completeRecord();
         }
      }
      

       

      Example of an exception triggering the bug:

      2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to topic-name: {}
      java.lang.IllegalStateException: Producer is closed forcefully.
              at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610)
              at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597)
              at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183)
              at java.lang.Thread.run(Thread.java:748)
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              pdavidson Paul Davidson
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: