Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.7.0
-
None
Description
When we send a event with SinkCallback to kafka under DEBUG level of log4j, if kafka reponses a result with exception but not a RecordMetadata. Kafka Sink will throw NPE.
code in SinkCallback:
if (logger.isDebugEnabled()) { long eventElapsedTime = System.currentTimeMillis() - startTime; logger.debug("Acked message partition:{} ofset:{}", metadata.partition(), metadata.offset()); logger.debug("Elapsed time for send: {}", eventElapsedTime); }
code in Kafka Producer:
if (exception == null) { RecordMetadata metadata = new RecordMetadata(...); thunk.callback.onCompletion(metadata, null); } else { thunk.callback.onCompletion(null, exception); }