Uploaded image for project: 'Samza'
  1. Samza
  2. SAMZA-173

Restoring a changelog with a null value triggers an NPE

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.6.0
    • 0.7.0
    • kv
    • None

    Description

      If a changelog has a null value in it (a delete), an NPE is triggered when the SamzaContainer restores it:

      java.lang.NullPointerException
      	at kafka.utils.Utils$.readBytes(Utils.scala:122)
      	at org.apache.samza.system.kafka.KafkaSystemConsumer$$anon$1.addMessage(KafkaSystemConsumer.scala:173)
      	at org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:223)
      	at org.apache.samza.system.kafka.BrokerProxy$$anonfun$moveMessagesToTheirQueue$1.apply(BrokerProxy.scala:222)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      	at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
      	at org.apache.samza.system.kafka.BrokerProxy.moveMessagesToTheirQueue(BrokerProxy.scala:222)
      	at org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156)
      	at org.apache.samza.system.kafka.BrokerProxy$$anonfun$org$apache$samza$system$kafka$BrokerProxy$$fetchMessages$1.apply(BrokerProxy.scala:156)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      	at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:549)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
      	at scala.collection.JavaConversions$JSetWrapper.foreach(JavaConversions.scala:642)
      	at org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:156)
      	at org.apache.samza.system.kafka.BrokerProxy$$anon$1.run(BrokerProxy.scala:120)
      	at java.lang.Thread.run(Thread.java:619)
      

      I believe it's due to this code:

            val message = if (msg.message.buffer != null) {
              deserializer.fromBytes(Utils.readBytes(msg.message.payload))
            } else {
              null
            }
      

      We are checking if buffer != null, but then passing in payload. I believe we need to check if payload is null instead.

      Attachments

        1. SAMZA-173.0.patch
          5 kB
          Chris Riccomini
        2. SAMZA-173.1.patch
          5 kB
          Chris Riccomini

        Activity

          People

            criccomini Chris Riccomini
            criccomini Chris Riccomini
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: