Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
0.11.0.1, 1.1.0, 2.0.1
-
None
-
None
-
Linux, JDK 8
Description
The producer callback in "WorkerSourceTask" handles exceptions during a send() by logging at ERROR level and continuing. This can lead to offsets being committed for records that were never sent correctly. The records are effectively skipped, leading to data loss in our use case.
The source code for the Callback "onCompletion()" method suggests this should "basically never happen ... callbacks with exceptions should never be invoked in practice", but we have seen this happen several times in production, especially in near heap-exhaustion situations when the Sender thread generates an exception (often caused by KAFKA-6551).
From WorkerSourceTask line 253:
new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { // Given the default settings for zero data loss, this should basically never happen -- // between "infinite" retries, indefinite blocking on full buffers, and "infinite" request // timeouts, callbacks with exceptions should never be invoked in practice. If the // user overrode these settings, the best we can do is notify them of the failure via // logging. log.error("{} failed to send record to {}: {}", this, topic, e); log.debug("{} Failed record: {}", this, preTransformRecord); } else { log.trace("{} Wrote record successfully: topic {} partition {} offset {}", this, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); commitTaskRecord(preTransformRecord); } recordSent(producerRecord); counter.completeRecord(); } }
Example of an exception triggering the bug:
2018-04-27 21:14:25,740 [kafka-producer-network-thread | source-23] ERROR o.a.k.c.runtime.WorkerSourceTask - source-23 failed to send record to topic-name: {} java.lang.IllegalStateException: Producer is closed forcefully. at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:610) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:597) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:183) at java.lang.Thread.run(Thread.java:748)
Attachments
Issue Links
- relates to
-
KAFKA-8586 Source task producers silently fail to send records
- Resolved