Description
If you close a disconnected KafkaCheckpointManager, you get an NPE. This is because the Kafka producer is null when it's not connected.
Exception in thread "ThreadJob" java.lang.NullPointerException at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.stop(KafkaCheckpointManager.scala:215) at org.apache.samza.container.SamzaContainer.shutdownCheckpoints(SamzaContainer.scala:655) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:510) at org.apache.samza.job.local.ThreadJob$$anon$1.run(ThreadJob.scala:42)
We just need to add an if statement here.