Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3990

kryo.KryoException caused by ALS.trainImplicit in pyspark

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.1.1, 1.2.0
    • Component/s: MLlib, PySpark
    • Labels:
    • Environment:

      5 slaves cluster(m3.large) in AWS launched by spark-ec2
      Linux
      Python 2.6.8

      Description

      When we tried ALS.trainImplicit() in pyspark environment, it only works for iterations = 1. What is more strange, it is that if we try the same code in Scala, it works very well.(I did several test, by now, in Scala ALS.trainImplicit works)

      For example, the following code:

      test.py
        from pyspark.mllib.recommendation import *
        r1 = (1, 1, 1.0) 
        r2 = (1, 2, 2.0) 
        r3 = (2, 1, 2.0) 
        ratings = sc.parallelize([r1, r2, r3]) 
        model = ALS.trainImplicit(ratings, 1) 
      '''by default iterations = 5 or model = ALS.trainImplicit(ratings, 1, 2)'''
      

      It will cause the failed stage at count at ALS.scala:314 Info as:

      error information provided by ganglia
      Job aborted due to stage failure: Task 6 in stage 90.0 failed 4 times, most recent failure: Lost task 6.3 in stage 90.0 (TID 484, ip-172-31-35-238.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      Serialization trace:
      shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
              com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
              com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
              com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
              com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
              com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
              com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
              org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
              org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
              org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
              org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
              scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
              org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
              org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
              org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
              scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
              scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
              scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
              org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
              org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
              org.apache.spark.scheduler.Task.run(Task.scala:54)
              org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              java.lang.Thread.run(Thread.java:745)
      Driver stacktrace:
      

      In the log of slave which failed the task, it has:

      error information in the log of slave
      14/10/17 13:20:54 ERROR executor.Executor: Exception in task 6.0 in stage 90.0 (TID 465)
      com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      Serialization trace:
      shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
      	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
      	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
      	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
      	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
      	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
      	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
      	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
      	at org.apache.spark.scheduler.Task.run(Task.scala:54)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
      	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
      	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
      	... 36 more
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                mengxr Xiangrui Meng
                Reporter:
                gen Gen TANG
                Shepherd:
                Xiangrui Meng
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: