Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-586

Trident kafka spout fails instead of updating offset when kafka offset is out of range.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 0.9.3
    • 0.10.0
    • storm-kafka
    • None

    Description

      Trident KafkaEmitter does not catch the newly added UpdateOffsetException which results in the spout failing repeatedly instead of automatically updating the offset to earliest time.

      PROBLEM: Exception while using the Trident Kafka Spout.

      2014-12-04 18:38:03 b.s.util ERROR Async loop died! 
      java.lang.RuntimeException: storm.kafka.UpdateOffsetException 
      at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at backtype.storm.daemon.executor$fn_4195$fn4207$fn_4254.invoke(executor.clj:745) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at backtype.storm.util$async_loop$fn__442.invoke(util.clj:436) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at clojure.lang.AFn.run(AFn.java:24) clojure-1.4.0.jar:na 
      at java.lang.Thread.run(Thread.java:745) na:1.7.0_71 
      Caused by: storm.kafka.UpdateOffsetException: null 
      at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) ~stormjar.jar:na 
      at storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) ~stormjar.jar:na 
      at storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) ~stormjar.jar:na 
      at storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) ~stormjar.jar:na 
      at storm.kafka.trident.TridentKafkaEmitter.access$400(TridentKafkaEmitter.java:46) ~stormjar.jar:na 
      at storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:233) ~stormjar.jar:na 
      at storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatchNew(TridentKafkaEmitter.java:225) ~stormjar.jar:na 
      at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter$1.init(PartitionedTridentSpoutExecutor.java:125) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at storm.trident.topology.state.RotatingTransactionalState.getState(RotatingTransactionalState.java:83) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at storm.trident.topology.state.RotatingTransactionalState.getStateOrCreate(RotatingTransactionalState.java:110) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:121) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at backtype.storm.daemon.executor$fn_4195$tuple_action_fn_4197.invoke(executor.clj:630) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at backtype.storm.daemon.executor$mk_task_receiver$fn__4118.invoke(executor.clj:398) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at backtype.storm.disruptor$clojure_handler$reify__723.onEvent(disruptor.clj:58) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:99) ~storm-core-0.9.1.2.1.7.0-784.jar:0.9.1.2.1.7.0-784 
      ... 6 common frames omitted 
      2014-12-04 18:38:03 b.s.d.executor ERROR 
      java.lang.RuntimeException: storm.kafka.UpdateOffsetException 
      at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) ~[storm-core-0.9.1.2.1.7.0
      

      Attachments

        Issue Links

          Activity

            People

              parth.brahmbhatt Parth Brahmbhatt
              parth.brahmbhatt Parth Brahmbhatt
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: