Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2207

Kafka Spout NullPointerException during ack

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Duplicate
    • Affects Version/s: 1.0.2
    • Fix Version/s: None
    • Component/s: storm-kafka-client
    • Labels:
      None

      Description

      This occurs on startup of the topology. There should be some null check safeguards, but i'm not sure what's causing it to occur in the first place...my guess is the topic partition is not found in the ack map.

      2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
      java.lang.RuntimeException: java.lang.NullPointerException
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_7990$fn8005$fn_8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
      Caused by: java.lang.NullPointerException
      at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
      at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_7990$tuple_action_fn_7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
      ... 7 more
      2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
      java.lang.RuntimeException: java.lang.NullPointerException
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_7990$fn8005$fn_8036.invoke(executor.clj:628) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
      Caused by: java.lang.NullPointerException
      at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) ~[stormjar.jar:?]
      at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_7990$tuple_action_fn_7996.invoke(executor.clj:536) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451) ~[storm-core-1.0.2.jar:1.0.2]
      ... 7 more
      2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
      java.lang.RuntimeException: ("Worker died")
      at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
      at org.apache.storm.daemon.worker$fn_8663$fn_8664.invoke(worker.clj:765) [storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$mk_executor_data$fn_7875$fn_7876.invoke(executor.clj:274) [storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]

      The method and line number in question below:

      @Override
      public void ack(Object messageId) {
      final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
      if (!consumerAutoCommitMode)

      { // Only need to keep track of acked tuples if commits are not done automatically acked.get(msgId.getTopicPartition()).add(msgId); }

      emitted.remove(msgId);
      }

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Srdo Stig Rohde Døssing
                Reporter:
                logikz Nick Cuneo
              • Votes:
                0 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: