Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3990

kryo.KryoException caused by ALS.trainImplicit in pyspark

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.1.0
    • 1.1.1, 1.2.0
    • MLlib, PySpark
    • 5 slaves cluster(m3.large) in AWS launched by spark-ec2
      Linux
      Python 2.6.8

    Description

      When we tried ALS.trainImplicit() in pyspark environment, it only works for iterations = 1. What is more strange, it is that if we try the same code in Scala, it works very well.(I did several test, by now, in Scala ALS.trainImplicit works)

      For example, the following code:

      test.py
        from pyspark.mllib.recommendation import *
        r1 = (1, 1, 1.0) 
        r2 = (1, 2, 2.0) 
        r3 = (2, 1, 2.0) 
        ratings = sc.parallelize([r1, r2, r3]) 
        model = ALS.trainImplicit(ratings, 1) 
      '''by default iterations = 5 or model = ALS.trainImplicit(ratings, 1, 2)'''
      

      It will cause the failed stage at count at ALS.scala:314 Info as:

      error information provided by ganglia
      Job aborted due to stage failure: Task 6 in stage 90.0 failed 4 times, most recent failure: Lost task 6.3 in stage 90.0 (TID 484, ip-172-31-35-238.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      Serialization trace:
      shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
              com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
              com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
              com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
              com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
              com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
              com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
              org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
              org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
              org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
              org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
              scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
              org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
              org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
              org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
              scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
              scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
              scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
              org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
              org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
              org.apache.spark.scheduler.Task.run(Task.scala:54)
              org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              java.lang.Thread.run(Thread.java:745)
      Driver stacktrace:
      

      In the log of slave which failed the task, it has:

      error information in the log of slave
      14/10/17 13:20:54 ERROR executor.Executor: Exception in task 6.0 in stage 90.0 (TID 465)
      com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      Serialization trace:
      shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
      	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
      	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
      	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
      	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
      	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
      	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
      	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
      	at org.apache.spark.scheduler.Task.run(Task.scala:54)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
      	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
      	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
      	... 36 more
      

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            mengxr Xiangrui Meng
            gen Gen TANG
            Xiangrui Meng Xiangrui Meng
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment