Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27567

Spark Streaming consumers (from Kafka) intermittently die with 'SparkException: Couldn't find leaders for Set'

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Bug
    • 1.5.0
    • None
    • DStreams
    • None
    • GCP / 170~14.04.1-Ubuntu

    Description

      Some of our consumers intermittently die with the stack traces I'm including. Once restarted they run for a while then die again.

      I can't find any cohesive documentation on what this error means and how to go about troubleshooting it. Any help would be appreciated.

      Kafka version is 0.8.2.1 (2.10-0.8.2.1).

      Some of the errors seen look like this:

      ERROR org.apache.spark.scheduler.TaskSchedulerImpl: Lost executor 2 on 10.150.0.54: remote Rpc client disassociated

      Main error stack trace:

      2019-04-23 20:36:54,323 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error g
      
      enerating jobs for time 1556066214000 ms
      
      org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49], [hdfs.hbase.acme.attachmen
      
      ts,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55], [hdfs.hbase.acme.attachme
      
      nts,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13], [hdfs.hbase.acme.attachme
      
      nts,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,53], [hdfs.hbase.acme.attach
      
      ments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,11], [hdfs.hbase.acme.attac
      
      hments,29], [hdfs.hbase.acme.attachments,33], [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], [hdfs.hbase.acme.attachments,45], [hdfs.hbase.acme.att
      
      achments,21], [hdfs.hbase.acme.attachments,3], [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], [hdfs.hbase.acme.attachments,17], [hdfs.hbase.acme.at
      
      tachments,61]))
      
      at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.j
      
      ar:?]
      
      at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja
      
      r:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja
      
      r:1.5.0]
      
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at scala.Option.orElse(Option.scala:257) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja
      
      r:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) ~[spark-assembly-1.5.0-hadoop2.4.0.ja
      
      r:1.5.0]
      
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at scala.Option.orElse(Option.scala:257) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at scala.util.Try$.apply(Try.scala:161) ~[acmedsc-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
      
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) ~[spark-assembly-1.
      
      5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
      
      Exception in thread "main" org.apache.spark.SparkException: ArrayBuffer(org.apache.spark.SparkException: Couldn't find leaders for Set([hdfs.hbase.acme.attachments,49],
      
      [hdfs.hbase.acme.attachments,63], [hdfs.hbase.acme.attachments,31], [hdfs.hbase.acme.attachments,9], [hdfs.hbase.acme.attachments,25], [hdfs.hbase.acme.attachments,55]
      
      , [hdfs.hbase.acme.attachments,5], [hdfs.hbase.acme.attachments,37], [hdfs.hbase.acme.attachments,7], [hdfs.hbase.acme.attachments,47], [hdfs.hbase.acme.attachments,13]
      
      , [hdfs.hbase.acme.attachments,43], [hdfs.hbase.acme.attachments,19], [hdfs.hbase.acme.attachments,15], [hdfs.hbase.acme.attachments,23], [hdfs.hbase.acme.attachments,5
      
      3], [hdfs.hbase.acme.attachments,1], [hdfs.hbase.acme.attachments,27], [hdfs.hbase.acme.attachments,57], [hdfs.hbase.acme.attachments,39], [hdfs.hbase.acme.attachments,
      
      11], [hdfs.hbase.acme.attachments,29], [hdfs.hbase.acme.attachments,33], [hdfs.hbase.acme.attachments,35], [hdfs.hbase.acme.attachments,51], [hdfs.hbase.acme.attachment
      
      s,45], [hdfs.hbase.acme.attachments,21], [hdfs.hbase.acme.attachments,3], [hdfs.hbase.acme.attachments,59], [hdfs.hbase.acme.attachments,41], [hdfs.hbase.acme.attachmen
      
      ts,17], [hdfs.hbase.acme.attachments,61]))
      
      at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
      
      at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
      
      at scala.Option.orElse(Option.scala:257)
      
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
      
      at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
      
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
      
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
      
      at scala.Option.orElse(Option.scala:257)
      
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
      
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
      
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
      
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
      
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
      
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
      
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
      
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
      
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
      
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
      
      at scala.util.Try$.apply(Try.scala:161)
      
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
      
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
      
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
      
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
      
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      
      2019-04-23 20:36:55,265 FATAL Unable to register shutdown hook because JVM is shutting down.
      
      [Stage 15597:=================================>                   (41 + 6) / 64]Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedExc
      
      eption
      
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
      
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      
      at java.lang.Thread.run(Thread.java:748)
      
      Caused by: java.lang.InterruptedException
      
      at java.lang.Object.wait(Native Method)
      
      at java.lang.Object.wait(Object.java:502)
      
      at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
      
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:559)
      
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
      
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
      
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
      
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
      
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
      
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
      
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
      
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
      
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
      
      at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)
      
      at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)
      
      at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)
      
      at com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218)
      
      at com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207)
      
      at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
      
      at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
      
      at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
      
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
      
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
      
      at scala.util.Try$.apply(Try.scala:161)
      
      at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
      
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
      
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
      
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
      
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
      
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
      
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      
      ... 2 more
      
      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            dgoldenberg123 Dmitry Goldenberg
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: