Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-3990

kryo.KryoException caused by ALS.trainImplicit in pyspark

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.1.0
    • 1.1.1, 1.2.0
    • MLlib, PySpark
    • 5 slaves cluster(m3.large) in AWS launched by spark-ec2
      Linux
      Python 2.6.8

    Description

      When we tried ALS.trainImplicit() in pyspark environment, it only works for iterations = 1. What is more strange, it is that if we try the same code in Scala, it works very well.(I did several test, by now, in Scala ALS.trainImplicit works)

      For example, the following code:

      test.py
        from pyspark.mllib.recommendation import *
        r1 = (1, 1, 1.0) 
        r2 = (1, 2, 2.0) 
        r3 = (2, 1, 2.0) 
        ratings = sc.parallelize([r1, r2, r3]) 
        model = ALS.trainImplicit(ratings, 1) 
      '''by default iterations = 5 or model = ALS.trainImplicit(ratings, 1, 2)'''
      

      It will cause the failed stage at count at ALS.scala:314 Info as:

      error information provided by ganglia
      Job aborted due to stage failure: Task 6 in stage 90.0 failed 4 times, most recent failure: Lost task 6.3 in stage 90.0 (TID 484, ip-172-31-35-238.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      Serialization trace:
      shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
              com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
              com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
              com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
              com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
              com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
              com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
              org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
              org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
              org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
              org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
              scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
              org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
              org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
              org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
              scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
              scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
              scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
              org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
              org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
              org.apache.spark.scheduler.Task.run(Task.scala:54)
              org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              java.lang.Thread.run(Thread.java:745)
      Driver stacktrace:
      

      In the log of slave which failed the task, it has:

      error information in the log of slave
      14/10/17 13:20:54 ERROR executor.Executor: Exception in task 6.0 in stage 90.0 (TID 465)
      com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      Serialization trace:
      shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
      	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
      	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
      	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
      	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
      	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
      	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
      	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
      	at org.apache.spark.scheduler.Task.run(Task.scala:54)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
      	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
      	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
      	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
      	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
      	... 36 more
      

      Attachments

        Issue Links

          Activity

            People

              mengxr Xiangrui Meng
              gen Gen TANG
              Xiangrui Meng Xiangrui Meng
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: