Description
I found a closed issue and replied there but decided to open one myself because although they're related they're slightly different. The original issue is at https://issues.apache.org/jira/browse/KAFKA-7678
The fix there has been to implement a null check around closing a producer because in some cases the producer is already null there (has been closed already)
In version 2.1.1 we are getting a very similar exception, but in the 'flush' method that is called pre-close. This is in the log:
message: stream-thread [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed while closing StreamTask 1_26 due to the following error: logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)
Followed by:
message: task [1_26] Could not close task due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.StreamTask
java.lang.NullPointerException: null
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)
If I look at the source code at this point, I see a nice null check in the close method, but not in the flush method that is called just before that:
public void flush() { this.log.debug("Flushing producer"); this.producer.flush(); this.checkForException(); } public void close() { this.log.debug("Closing producer"); if (this.producer != null) { this.producer.close(); this.producer = null; } this.checkForException(); }
Seems to my (ignorant) eye that the flush method should also be wrapped in a null check in the same way as has been done for close.
Attachments
Issue Links
- links to
Thanks for reporting this. What I don't understand is, why we would flush after we closed a task already. Hence, I am not sure if a null-guard is the correct fix, but to rather make sure we don't call flush() in the first place.
Can you maybe provide debug level logs? This might help to understand the scenario better.