Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19680

Offsets out of range with no configured reset policy for partitions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.0
    • None
    • DStreams

    Description

      I'm using spark streaming with kafka to acutally create a toplist. I want to read all the messages in kafka. So I set

      "auto.offset.reset" -> "earliest"

      Nevertheless when I start the job on our spark cluster it is not working I get:

      Error:

      error.log
      	Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {SearchEvents-2=161803385}
      

      This is somehow wrong because I did set the auto.offset.reset property

      Setup:

      Kafka Parameter:

      Config.Scala
        def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, Object] = {
          Map(
            "bootstrap.servers" -> properties.getProperty("kafka.bootstrap.servers"),
            "group.id" -> properties.getProperty("kafka.consumer.group"),
            "auto.offset.reset" -> "earliest",
            "spark.streaming.kafka.consumer.cache.enabled" -> "false",
            "enable.auto.commit" -> "false",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
        }
      

      Job:

      Job.Scala
        def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: Broadcast[KafkaSink[TopList]]): Unit = {
          getFilteredStream(stream.map(_.value()), windowDuration, slideDuration).foreachRDD(rdd => {
            val topList = new TopList
            topList.setCreated(new Date())
            topList.setTopListEntryList(rdd.take(TopListLength).toList)
            CurrentLogger.info("TopList length: " + topList.getTopListEntryList.size().toString)
            kafkaSink.value.send(SendToTopicName, topList)
            CurrentLogger.info("Last Run: " + System.currentTimeMillis())
          })
      
        }
      
        def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, slideDuration: Int): DStream[TopListEntry] = {
      
          val Mapper = MapperObject.readerFor[SearchEventDTO]
      
          result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
            .filter(s => s != null && s.getSearchRequest != null && s.getSearchRequest.getSearchParameters != null && s.getVertical == Vertical.BAP && s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
            .map(row => {
              val name = row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
              (name, new TopListEntry(name, 1, row.getResultCount))
            })
            .reduceByKeyAndWindow(
              (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + b.getMeanSearchHits),
              (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - b.getMeanSearchHits),
              Minutes(windowDuration),
              Seconds(slideDuration))
            .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
            .map(row => (row._2.getSearchCount, row._2))
            .transform(rdd => rdd.sortByKey(ascending = false))
            .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, row._2.getMeanSearchHits / row._2.getSearchCount))
        }
      
        def main(properties: Properties): Unit = {
      
          val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
          val kafkaSink = sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
          val kafkaParams: Map[String, Object] = SparkUtil.getDefaultKafkaReceiverParameter(properties)
          val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
      
          ssc.checkpoint("/home/spark/checkpoints")
          val adEventStream =
            KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), kafkaParams))
      
          processSearchKeyWords(adEventStream, SparkUtil.getWindowDuration(properties), SparkUtil.getSlideDuration(properties), kafkaSink)
      
          ssc.start()
          ssc.awaitTermination()
      
        }
      

      As I saw in the code KafkaUtils

      Job.Scala
          logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
          kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
      

      This means as soon as one worker has a kafka partion that can no be processed because the offset is not valid anymore due to retention policy the streaming job will stop working

      Attachments

        Activity

          People

            Unassigned Unassigned
            rene.s Schakmann Rene
            Votes:
            12 Vote for this issue
            Watchers:
            19 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: