Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21999

ConcurrentModificationException - Spark Streaming

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Won't Fix
    • 2.1.0
    • None
    • Spark Core
    • None

    Description

      Hi,

      I am using Spark Streaming v2.1.0 with Kafka 0.8. I am getting ConcurrentModificationException intermittently. When it occurs, Spark does not honor the specified value of spark.task.maxFailures. So Spark aborts the current batch and fetch the next batch, so it results in lost data. Its exception stack is listed below.

      This instance of ConcurrentModificationException is similar to the issue at https://issues.apache.org/jira/browse/SPARK-17463, which was about Serialization of accumulators in heartbeats. However, my Spark stream app does not use accumulators.

      The stack trace listed below occurred on the Spark master in Spark streaming driver at the time of data loss.

      From the line of code in the first stack trace, can you tell which object Spark was trying to serialize ? What is the root cause for this issue ?

      Because this issue results in lost data as described above, could you have this issue fixed ASAP ?

      Thanks.

      Michael N.,

      ----------------

      Stack trace of Spark Streaming driver
      ERROR JobScheduler:91: Error generating jobs for time 1505224930000 ms
      org.apache.spark.SparkException: Task not serializable
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
      at scala.Option.map(Option.scala:146)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
      at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
      at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.immutable.List.map(List.scala:285)
      at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
      at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      Caused by: java.util.ConcurrentModificationException
      at java.util.ArrayList.writeObject(ArrayList.java:766)
      at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      ... 60 more
      2017-09-12 07:02:10.029 ERROR
      org.apache.spark.SparkException: Task not serializable
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
      at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:792)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:792)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream$$anonfun$compute$1.apply(MapPartitionedDStream.scala:37)
      at scala.Option.map(Option.scala:146)
      at org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
      at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
      at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.immutable.List.map(List.scala:285)
      at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
      at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
      at scala.Option.orElse(Option.scala:289)
      at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
      at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
      at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
      at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
      at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      Caused by: java.util.ConcurrentModificationException
      at java.util.ArrayList.writeObject(ArrayList.java:766)
      at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
      at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
      at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      ... 60 more

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              michaeln_apache Michael N (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: