Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19976

DirectStream API throws OffsetOutOfRange Exception

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.0.0
    • None
    • DStreams
    • None
    • Important

    Description

      I am using following code. While data on kafka topic get deleted/retention period is over, it throws Exception and application crash
      def functionToCreateContext(sc:SparkContext):StreamingContext = {

      val kafkaParams = new mutable.HashMap[String, Object]()
      kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
      kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerGrp)
      kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
      kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
      kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true")
      kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

      val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topic, kafkaParams)

      val kafkaStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategy)
      }

      spark throws error and crash once OffsetOutOf RangeException is thrown
      WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 : org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:

      {test-2=127287}

      at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
      at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
      at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
      at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98)
      at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
      at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

      Attachments

        Activity

          People

            Unassigned Unassigned
            twasti Taukir
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: