Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-1367

Issue with OpaqueTridentKafkaSpout - TridentKafkaConfig getting (java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException)

    XMLWordPrintableJSON

Details

    Description

      I'm using trident topology with OpaqueTridentKafkaSpout.

      Code snippet of TridentKafkaConfig i’m using :-

      OpaqueTridentKafkaSpout kafkaSpout = null;
      TridentKafkaConfig spoutConfig = new TridentKafkaConfig(new ZkHosts("xxx.x.x.9:2181,xxx.x.x.1:2181,xxx.x.x.2:2181"), "topic_name");
      spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
      spoutConfig.forceFromStart = true;
      spoutConfig.fetchSizeBytes = 147483600;
      kafkaSpout = new OpaqueTridentKafkaSpout(spoutConfig);

      I get this runtime exception from one of the workers :-

      java.lang.RuntimeException: storm.kafka.UpdateOffsetException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.daemon.executor$fn_5694$fn5707$fn5758.invoke(executor.clj:819) at backtype.storm.util$async_loop$fn545.invoke(util.clj:479) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) Caused by: storm.kafka.UpdateOffsetException at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) at storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) at storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) at storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) at storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79) at storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafkaEmitter.java:46) at storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:204) at storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafkaEmitter.java:194) at storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127) at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370) at backtype.storm.daemon.executor$fn5694$tuple_action_fn5696.invoke(executor.clj:690) at backtype.storm.daemon.executor$mk_task_receiver$fn5615.invoke(executor.clj:436) at backtype.storm.disruptor$clojure_handler$reify_5189.onEvent(disruptor.clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) ... 6 more

      But when i set spoutConfig.forceFromStart = true... It works fine for a while and then fails throwing this exception. I need a trident topology to give out accurate exactly-once processing even when the topology is restarted.

      As per some posts, I have tried setting spoutConfig :-

      spoutConfig.maxOffsetBehind = Long.MAX_VALUE;

      spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

      My Kafka retention time is default - 128 Hours i.e. 7 Days and kafka producer is sending 6800 messages/second to Storm/Trident topology. I have gone through most of the posts, but none of them seem to solve this issue.

      Attachments

        Activity

          People

            sriharsha Harsha
            anavalamudi Rakesh Surendra
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: