Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.1.0
-
None
Description
I'm using spark streaming with kafka to acutally create a toplist. I want to read all the messages in kafka. So I set
"auto.offset.reset" -> "earliest"
Nevertheless when I start the job on our spark cluster it is not working I get:
Error:
error.log
Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {SearchEvents-2=161803385}
This is somehow wrong because I did set the auto.offset.reset property
Setup:
Kafka Parameter:
Config.Scala
def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, Object] = { Map( "bootstrap.servers" -> properties.getProperty("kafka.bootstrap.servers"), "group.id" -> properties.getProperty("kafka.consumer.group"), "auto.offset.reset" -> "earliest", "spark.streaming.kafka.consumer.cache.enabled" -> "false", "enable.auto.commit" -> "false", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer") }
Job:
Job.Scala
def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: Broadcast[KafkaSink[TopList]]): Unit = { getFilteredStream(stream.map(_.value()), windowDuration, slideDuration).foreachRDD(rdd => { val topList = new TopList topList.setCreated(new Date()) topList.setTopListEntryList(rdd.take(TopListLength).toList) CurrentLogger.info("TopList length: " + topList.getTopListEntryList.size().toString) kafkaSink.value.send(SendToTopicName, topList) CurrentLogger.info("Last Run: " + System.currentTimeMillis()) }) } def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, slideDuration: Int): DStream[TopListEntry] = { val Mapper = MapperObject.readerFor[SearchEventDTO] result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s)) .filter(s => s != null && s.getSearchRequest != null && s.getSearchRequest.getSearchParameters != null && s.getVertical == Vertical.BAP && s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName)) .map(row => { val name = row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase() (name, new TopListEntry(name, 1, row.getResultCount)) }) .reduceByKeyAndWindow( (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + b.getMeanSearchHits), (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - b.getMeanSearchHits), Minutes(windowDuration), Seconds(slideDuration)) .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L) .map(row => (row._2.getSearchCount, row._2)) .transform(rdd => rdd.sortByKey(ascending = false)) .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, row._2.getMeanSearchHits / row._2.getSearchCount)) } def main(properties: Properties): Unit = { val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName) val kafkaSink = sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties))) val kafkaParams: Map[String, Object] = SparkUtil.getDefaultKafkaReceiverParameter(properties) val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30)) ssc.checkpoint("/home/spark/checkpoints") val adEventStream = KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), kafkaParams)) processSearchKeyWords(adEventStream, SparkUtil.getWindowDuration(properties), SparkUtil.getSlideDuration(properties), kafkaSink) ssc.start() ssc.awaitTermination() }
As I saw in the code KafkaUtils
Job.Scala
logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor") kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
This means as soon as one worker has a kafka partion that can no be processed because the offset is not valid anymore due to retention policy the streaming job will stop working