Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21999

ConcurrentModificationException - Spark Streaming

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Won't Fix
    • 2.1.0
    • None
    • Spark Core
    • None

    Description

      Hi,

      I am using Spark Streaming v2.1.0 with Kafka 0.8. I am getting ConcurrentModificationException intermittently. When it occurs, Spark does not honor the specified value of spark.task.maxFailures. So Spark aborts the current batch and fetch the next batch, so it results in lost data. Its exception stack is listed below.

      This instance of ConcurrentModificationException is similar to the issue at https://issues.apache.org/jira/browse/SPARK-17463, which was about Serialization of accumulators in heartbeats. However, my Spark stream app does not use accumulators.

      The stack trace listed below occurred on the Spark master in Spark streaming driver at the time of data loss.

      From the line of code in the first stack trace, can you tell which object Spark was trying to serialize ? What is the root cause for this issue ?

      Because this issue results in lost data as described above, could you have this issue fixed ASAP ?

      Thanks.

      Michael N.,

      ----------------

      Stack trace of Spark Streaming driver
      ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
      org.apache.spark.SparkException: Task not serializable
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
      at scala.Option.map(Option.scala:146)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
      at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
      at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.immutable.List.map(List.scala:285)
      at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
      at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      Caused by: java.util.ConcurrentModificationException
      at java.util.ArrayList.writeObject(ArrayList.java:766)
      at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      ... 60 more
      2017-09-12 07:02:10.029 ERROR
      org.apache.spark.SparkException: Task not serializable
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
      at scala.Option.map(Option.scala:146)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
      at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
      at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.immutable.List.map(List.scala:285)
      at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
      at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      Caused by: java.util.ConcurrentModificationException
      at java.util.ArrayList.writeObject(ArrayList.java:766)
      at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      ... 60 more

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            michaeln_apache Michael N (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment