Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
2.0.0, 1.2.1, 1.1.3
Description
aniket.alhat reported on the mailing list that he got an NPE when trying to start the Trident spout.
2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died! java.lang.NullPointerException: null at org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:84) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:100) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50) ~[stormjar.jar:?] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
It looks to me like the partitionsFor method on the consumer will return null if the specified topic doesn't exist. We didn't account for this in the filter, because the return type of the method is a List, and we assumed it wouldn't be null.
I think it's reasonable that people should be able to subscribe to topics that don't exist yet, and the spout should pick up the new topics eventually.
We should check for null here https://github.com/apache/storm/blob/93ed601425a79759c0189a945c6b46266e5c9ced/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java#L55, and maybe log a warning if the returned value is null.
Attachments
Issue Links
- links to