Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3082

NamedTopicFilter can't handle topics that don't exist yet

    XMLWordPrintableJSON

Details

    Description

      aniket.alhat reported on the mailing list that he got an NPE when trying to start the Trident spout.

      2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died!
      java.lang.NullPointerException: null
              at org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:84) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:100) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50) ~[stormjar.jar:?]
              at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) [storm-core-1.2.1.jar:1.2.1]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
      

      It looks to me like the partitionsFor method on the consumer will return null if the specified topic doesn't exist. We didn't account for this in the filter, because the return type of the method is a List, and we assumed it wouldn't be null.

      I think it's reasonable that people should be able to subscribe to topics that don't exist yet, and the spout should pick up the new topics eventually.

      We should check for null here https://github.com/apache/storm/blob/93ed601425a79759c0189a945c6b46266e5c9ced/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java#L55, and maybe log a warning if the returned value is null.

      Attachments

        Issue Links

          Activity

            People

              aniket.alhat Aniket Alhat
              srdo Stig Rohde Døssing
              Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h
                  2h