Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27529

Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 1.5.0
    • None
    • DStreams

    Description

      We have a Spark Streaming consumer which at a certain point started consistently failing upon a restart with the below error.

      Some details:

      • Spark version is 1.5.0.
      • Kafka version is 0.8.2.1 (2.10-0.8.2.1).
      • The topic is configured with: retention.ms=1471228928, max.message.bytes=100000000.
      • The consumer runs with auto.offset.reset=smallest.
      • No checkpointing is currently enabled.

      I don't see anything in the Spark or Kafka doc to understand why this is happening. From googling around,

      https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
      
      Finally, I’ll repeat that any semantics beyond at-most-once require that you have sufficient log retention in Kafka. If you’re seeing things like OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka storage, not because something’s wrong with Spark or Kafka.

      Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible causes.

      You've under-provisioned Kafka storage and / or Spark compute capacity.
      The result is that data is being deleted before it has been processed.

      All we're trying to do is start the consumer and consume from the topic from the earliest available offset. Why would we not be able to do that? How can the offsets be out of range if we're saying, just read from the earliest available?

      Since we have the retention.ms set to 1 year and we created the topic just a few weeks ago, I'd not expect any deletion being done by Kafka as we're consuming.

      I'd like to understand the actual cause of this error. Any recommendations on a workaround would be appreciated.

      Stack traces:

      2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler
      
      .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job
      
      2019-04-19 11:35:17,160 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1555682554000 ms.0
      
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 147.0 failed 4 times, most recent failure: Lost task
      
      10.3 in stage 147.0 (TID 2368, 10.150.0.58): kafka.common.OffsetOutOfRangeException
      
      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      
      at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      
      at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      
      at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      
      at java.lang.Class.newInstance(Class.java:442)
      
      at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
      
      at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
      
      at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
      
      at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
      
      at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
      
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      
      at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
      
      at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)
      
      at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)
      
      at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
      
      at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
      
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
      
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
      
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
      
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
      
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      
      at org.apache.spark.scheduler.Task.run(Task.scala:88)
      
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
      
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      
      at java.lang.Thread.run(Thread.java:748)
      
      
      
      
      Driver stacktrace:
      
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca
      
      la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) ~[spark-assembly-1.5.0-hadoop2.4
      
      .0.jar:1.5.0]
      
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) ~[spark-assembly-1.5.0-hadoop2.4
      
      .0.jar:1.5.0]
      
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.ja
      
      r:?]
      
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) ~[spark-assembly-1.5.0-h
      
      adoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) ~[spark-assembly-1.5.0-h
      
      adoop2.4.0.jar:1.5.0]
      
      at scala.Option.foreach(Option.scala:236) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.
      
      5.0]
      
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493) ~[spark-assembly-1.5.0-hadoop2.4
      
      .0.jar:1.5.0]
      
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) ~[spark-assembly-1.5.0-hadoop2.4.0
      
      .jar:1.5.0]
      
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) ~[spark-assembly-1.5.0-hadoop2.4.0
      
      .jar:1.5.0]
      
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.
      
      5.0]
      
      at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1
      
      .5.0]
      
      at com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218) ~[acme-ingest-kafka-spa
      
      rk-2.0.0-SNAPSHOT.jar:?]
      
      at com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207) ~[acme-ingest-kafka-spa
      
      rk-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315) ~[spark-assembly-1
      
      .5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315) ~[spark-assembly-1
      
      .5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) ~[spark-ass
      
      embly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) ~[spark-ass
      
      embly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) ~[
      
      spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) ~[spark-a
      
      ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) ~[spark-a
      
      ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0
      
      .jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) ~[spark-assembly-1.5.0-had
      
      oop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) ~[spark-assembly-1.5.0-hadoop2.4.
      
      0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) ~[spark-assembly-1.5.0-hadoop2.4.
      
      0.jar:1.5.0]
      
      at scala.util.Try$.apply(Try.scala:161) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207) ~[spark-asse
      
      mbly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) ~[spark-assembly-1.
      
      5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) ~[spark-assembly-1.
      
      5.0-hadoop2.4.0.jar:1.5.0]
      
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206) ~[spark-assembly-1.5.0-hadoop2.4.0.j
      
      ar:1.5.0]
      
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_201]
      
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_201]
      
      at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
      
      Caused by: kafka.common.OffsetOutOfRangeException
      
      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_201]
      
      at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_201]
      
      at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_201]
      
      at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_201]
      
      at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_201]
      
      at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) ~[acme-ingest-kafka-spark-2.
      
      0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) ~[acme-ingest-kafka-spark-2.0.0-
      
      SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) ~[acme-ingest-kafka-spark-2.0.0-SNA
      
      PSHOT.jar:?]
      
      at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:
      
      ?]
      
      at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69) ~[acme-ingest-kafka-spark-2.
      
      0.0-SNAPSHOT.jar:?]
      
      at com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24) ~[acme-ingest-kafka-spark-2.
      
      0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) ~[spark-assembly-1.5.0-hadoop2
      
      .4.0.jar:1.5.0]
      
      at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) ~[spark-assembly-1.5.0-hadoop2
      
      .4.0.jar:1.5.0]
      
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) ~[spark-assembly-1.5.0-hadoop2.4.
      
      0.jar:1.5.0]
      
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) ~[spark-assembly-1.5.0-hadoop2.4.
      
      0.jar:1.5.0]
      
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.scheduler.Task.run(Task.scala:88) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      ... 3 more

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            dgoldenberg123 Dmitry Goldenberg
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: