Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.9.0
-
None
-
None
Description
In the current KafkaSystemProducer design, there is a possibility that a non-retriable exceptions can be thrown from the Kafka producer send thread and creates the race conditions in the following code blocks:
82 sendFailed.set(false) 83 84 retryBackoff.run( 85 loop => { 86 if (sendFailed.get()) { 87 throw exceptionThrown.get() 88 }
And
91 def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { 92 if (exception == null) { 93 //send was successful. Don't retry 94 metrics.sendSuccess.inc 95 } else { 96 //If there is an exception in the callback, it means that the Kafka producer has exhausted the max-retries 97 //Hence, fail container! 98 exceptionThrown.compareAndSet(null, exception) 99 sendFailed.set(true) 100 } 101 }
The main thread sets and gets sendFailed in line 82 and 86, and the Kafka send thread is setting it in line 99.
Thera could be two race conditions here:
1) the Kafka send thread complete line 99 and the main thread executes line 82, in which we missed an exception
2) the main thread finishes line 82 in the current message, and the Kafka send thread execute line 99 for the previous message. In this case, the main thread got an exception that is for the previous message, not the current one.
The configuration that can trigger this to happen is:
systems.kafka.producer.max.request.size=102400
Broker side:
message.max.bytes=10240
And inside task.process(), we send a 16KB message first, then a small message.