Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3082

NamedTopicFilter can't handle topics that don't exist yet

    XMLWordPrintableJSON

    Details

      Description

      Aniket Alhat reported on the mailing list that he got an NPE when trying to start the Trident spout.

      2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died!
      java.lang.NullPointerException: null
              at org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:84) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:100) ~[stormjar.jar:?]
              at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50) ~[stormjar.jar:?]
              at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) ~[storm-core-1.2.1.jar:1.2.1]
              at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) [storm-core-1.2.1.jar:1.2.1]
              at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
              at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
      

      It looks to me like the partitionsFor method on the consumer will return null if the specified topic doesn't exist. We didn't account for this in the filter, because the return type of the method is a List, and we assumed it wouldn't be null.

      I think it's reasonable that people should be able to subscribe to topics that don't exist yet, and the spout should pick up the new topics eventually.

      We should check for null here https://github.com/apache/storm/blob/93ed601425a79759c0189a945c6b46266e5c9ced/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java#L55, and maybe log a warning if the returned value is null.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                aniket.alhat Aniket Alhat
                Reporter:
                Srdo Stig Rohde Døssing
              • Votes:
                1 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h
                  2h