Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3046

Getting a NPE leading worker to die when starting a topology.

Details

    • Important

    Description

      I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0.

      We have an external kafka from where we get the messages.
      Whenever I try to run the topology, I get a NPE, which leads to the worker getting died.
      If I set poll strategy to earliest and the topic already contains some messages, it works fine.
      I have used a custom record translator which is working fine.

      Can someone please help me fix the issue?

      Thanks.

       

      Error - 

      10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died!
      java.lang.RuntimeException: java.lang.NullPointerException
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.daemon.executor$fn_5043$fn5056$fn_5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
      Caused by: java.lang.NullPointerException
      at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1]
      at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1]
      at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1]
      at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.daemon.executor$fn_5043$tuple_action_fn_5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1]
      ... 6 more

       

       

      Topology class - 

       

       

       

       

      import org.apache.storm.Config;
      import org.apache.storm.LocalCluster;
      import org.apache.storm.StormSubmitter;
      import org.apache.storm.generated.*;
      import org.apache.storm.kafka.spout.KafkaSpoutConfig;
      import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
      import org.apache.storm.trident.Stream;
      import org.apache.storm.trident.TridentState;
      import org.apache.storm.trident.TridentTopology;
      import org.apache.storm.tuple.Fields;

      import java.util.Properties;

       

      public class TestTopology {

       

      private static StormTopology buildTopology(Properties stormProperties)

      { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); }

      public static void main(String[] args) {
      Config conf = new Config();
      log.info("Topology config: " + conf);
      Properties properties = getProperties("/storm-cluster.properties");

      conf.setMessageTimeoutSecs(600);

      log.info("Building Topology");
      StormTopology topology = buildTopology(properties);
      log.info(topology.toString());

      log.info("Submitting handle-rule Topology");
      try

      { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("handle-rule", conf, topology); }

      catch (Exception e)

      { e.printStackTrace(); }

      }

      }

      Attachments

        1. TestTopology.java
          3 kB
          Kush Khandelwal

        Activity

          This is caused by a bad assumption I made here https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L161.

          I assumed that the last batch metadata would only be null for the first batch, or reemits of the first batch. I forgot to account for cases where the first batch contains no tuples, because in that case we'll set the metadata for the first batch to null (see https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L125). In the following call to emitBatch, we're going to hit this line with a null lastBatchMeta https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L181. This is very likely going to happen if you use e.g. LATEST as the FirstPollOffsetStrategy, and as you observe it doesn't happen if you start at the beginning of a partition and there are messages to emit.

          Let me know if you'd like to try to fix it. If not I'll be happy to give it a shot, and would appreciate if you would try out the potential fix.

          srdo Stig Rohde Døssing added a comment - This is caused by a bad assumption I made here https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L161 . I assumed that the last batch metadata would only be null for the first batch, or reemits of the first batch. I forgot to account for cases where the first batch contains no tuples, because in that case we'll set the metadata for the first batch to null (see https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L125 ). In the following call to emitBatch, we're going to hit this line with a null lastBatchMeta https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L181 . This is very likely going to happen if you use e.g. LATEST as the FirstPollOffsetStrategy, and as you observe it doesn't happen if you start at the beginning of a partition and there are messages to emit. Let me know if you'd like to try to fix it. If not I'll be happy to give it a shot, and would appreciate if you would try out the potential fix.
          moroseking moroseking added a comment -

          if you use opaquetrident, you must change some code that handle the condition when the lastbatch is null

          moroseking moroseking added a comment - if you use opaquetrident, you must change some code that handle the condition when the lastbatch is null

          I configured firstPollOffsetStrategy as FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, but the error still persists. Any suggestions to quickly fix this while creating Trident topology ?

          shaikasifullah Shaik Asifullah added a comment - I configured firstPollOffsetStrategy as FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, but the error still persists. Any suggestions to quickly fix this while creating Trident topology ?

          I'm happy to provide a patched 1.x branch, you could check that out and see if the patch fixes the issue?

          srdo Stig Rohde Døssing added a comment - I'm happy to provide a patched 1.x branch, you could check that out and see if the patch fixes the issue?

          Here is the patched 1.x https://github.com/srdo/storm/tree/STORM-3046-1.x. You should be able to check out and build storm-kafka-client from there.

          srdo Stig Rohde Døssing added a comment - Here is the patched 1.x https://github.com/srdo/storm/tree/STORM-3046-1.x . You should be able to check out and build storm-kafka-client from there.
          aniket.alhat Aniket Alhat added a comment -

          Srdo any plans on merging and release of this to 1.2.x?

          aniket.alhat Aniket Alhat added a comment - Srdo any plans on merging and release of this to 1.2.x?

          aniket.alhat I'll put up a 1.x version as soon as the PR targeting master has been reviewed and merged.

          srdo Stig Rohde Døssing added a comment - aniket.alhat I'll put up a 1.x version as soon as the PR targeting master has been reviewed and merged.
          kabhwan Jungtaek Lim added a comment -

          Thanks Srdo, I merged into master.

          The patch wasn't applied to 1.x-branch cleanly. Could you please raise a PR for 1.x-branch as well? Thanks in advance!

          kabhwan Jungtaek Lim added a comment - Thanks Srdo , I merged into master. The patch wasn't applied to 1.x-branch cleanly. Could you please raise a PR for 1.x-branch as well? Thanks in advance!
          kabhwan Jungtaek Lim added a comment -

          Also merged into 1.x-branch as well.

          kabhwan Jungtaek Lim added a comment - Also merged into 1.x-branch as well.

          People

            srdo Stig Rohde Døssing
            kush.kh Kush Khandelwal
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 50m
                50m