Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2207

Kafka Spout NullPointerException during ack

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Duplicate
    • 1.0.2
    • None
    • storm-kafka-client
    • None

    Description

      This occurs on startup of the topology. There should be some null check safeguards, but i'm not sure what's causing it to occur in the first place...my guess is the topic partition is not found in the ack map.

      2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
      java.lang.RuntimeException: java.lang.NullPointerException
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_7990$fn8005$fn_8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
      Caused by: java.lang.NullPointerException
      at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
      at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_7990$tuple_action_fn_7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
      ... 7 more
      2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
      java.lang.RuntimeException: java.lang.NullPointerException
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_7990$fn8005$fn_8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
      Caused by: java.lang.NullPointerException
      at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
      at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_7990$tuple_action_fn_7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
      ... 7 more
      2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
      java.lang.RuntimeException: ("Worker died")
      at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.worker$fn_8663$fn_8664.invoke(worker.clj:765) [storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$mk_executor_data$fn_7875$fn_7876.invoke(executor.clj:274) [storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]

      The method and line number in question below:

      @Override
      public void ack(Object messageId) {
      final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
      if (!consumerAutoCommitMode)

      { // Only need to keep track of acked tuples if commits are not done automatically acked.get(msgId.getTopicPartition()).add(msgId); }

      emitted.remove(msgId);
      }

      Attachments

        Issue Links

          Activity

            People

              srdo Stig Rohde Døssing
              logikz Nick Cuneo
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: