Description
If a changelog has a null value in it (a delete), an NPE is triggered when the SamzaContainer restores it:
java.lang.NullPointerException at kafka.utils.Utils$.readBytes(Utils.scala:122) at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.addMessage(KafkaSystemConsumer.scala:173) at org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:223) at org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:222) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at org.apache.samza.system.kafka.BrokerProxy.moveMessagesToTheirQueue(BrokerProxy.scala:222) at org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156) at org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at scala.collection.JavaConversions$JSetWrapper.foreach(JavaConversions.scala:642) at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:156) at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:120) at java.lang.Thread.run(Thread.java:619)
I believe it's due to this code:
val message = if (msg.message.buffer != null) { deserializer.fromBytes(Utils.readBytes(msg.message.payload)) } else { null }
We are checking if buffer != null, but then passing in payload. I believe we need to check if payload is null instead.