Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5961

NullPointerException when consumer restore read messages with null key.

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.2.1
    • Fix Version/s: 0.11.0.0, 0.11.0.1
    • Component/s: streams
    • Labels:
      None

      Description

      If you have a kafka streams that use:

      stream.table("topicA")
      

      When the application is running if you send a message with a null key, it works fine. Later, if you restart the application when the restore consumer starts to read the topicA from the beginning, it crashes because doesn't filter the null key.

      I know that isn't normal send a null key to a topic that is a table topic, but maybe sometimes can happen .. and I think that kafka streams could protect it self.

      This is the stack trace:

      ConsumerCoordinator [ERROR] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group my-cep-app_enricher failed on partition assignment
      java.lang.NullPointerException
      	at org.rocksdb.RocksDB.put(RocksDB.java:488)
      	at org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
      	at org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
      	at org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
      	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
      	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
      	at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
      	at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
      	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
      	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
      	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
      	at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
      	at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
      	at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
      	at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
      	at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
      	at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
      	at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
      	at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
      	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
      	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              agomez Andres Gomez Ferrer
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: