Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2281

Running Multiple Kafka Spouts (Trident) Throws Illegal State Exception

    Details

      Description

      For Kafka Spout New Consumer in Trident, if we increase the spout parallelism more than one then we can see that the below error happens

      It is reproducible most of the times, it it does not then just kill and restart topology. (if spout parallelism is 1 there is no problem, it only happens with multiple spouts)

      Steps to Reproduce:
      1. Create a Spout Only Trident Topology (or read write topology)
      2. Create a topic with multiple partition (2 or more)
      3. Pump some data and try to read with parallelism of 2 or more

      No current assignment for partition input-1
      at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.daemon.executor$fn_8058$fn8071$fn_8124.invoke(executor.clj:850) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
      at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]
      Caused by: java.lang.IllegalStateException: No current assignment for partition input-1
      at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231) ~[kafka-clients-0.10.0.0.jar:?]
      at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:256) ~[kafka-clients-0.10.0.0.jar:?]
      at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1134) ~[kafka-clients-0.10.0.0.jar:?]
      at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:139) ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
      at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:88) ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
      at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:47) ~[storm-kafka-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
      at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:128) ~[storm-core-1.0.2.jar:1.0.2]
      at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.0.2.jar:1.0.2]

        Attachments

          Activity

            People

            • Assignee:
              hmclouro Hugo Louro
              Reporter:
              supermonk Narendra Bidari
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 7h
                7h