Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-28927

Improve error for ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

Rank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.2.1
    • 3.0.0
    • ML
    • None

    Description

      The stack trace is below:

      19/08/28 07:00:40 WARN Executor task launch worker for task 325074 BlockManager: Block rdd_10916_493 could not be removed as it was not found on disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) java.lang.ArrayIndexOutOfBoundsException: 6741 at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

      This exception happened sometimes.  And we also found that the AUC metric was not stable when evaluating the inner product of the user factors and the item factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 which was not stable for production environment. 

      Dataset capacity: ~12 billion ratings
      Here is the our code:

      val hivedata = sc.sql(sqltext).select("id", "dpid", "score", "tag")
                  .repartition(6000).persist(StorageLevel.MEMORY_AND_DISK_SER)
      val zeroValueArrItem = ArrayBuffer[(String, Int)]()
      val predataItem = hivedata.
        map(r => (r.getString(0), (r.getString(1), r.getInt(2)))).rdd.
        aggregateByKey(zeroValueArrItem, 6000)((a, b) => a += b, (a, b) => a ++ b).
        zipWithIndex().
        setName(predataItemName).
        persist(StorageLevel.MEMORY_AND_DISK_SER)
      val zeroValueArr = ArrayBuffer[(Int, Int)]()
      val predataUser = predataItem.
        flatMap(r => r._1._2.map(y => (y._1, (r._2.toInt, y._2)))).
        aggregateByKey(zeroValueArr, 6000)((a, b) => a += b, (a, b) => a ++ b).
        zipWithIndex().setName(predataUserName).persist(StorageLevel.MEMORY_AND_DISK_SER)
      val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, y._2.toFloat)))
       .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class ALSData(user:Int, item:Int, rating:Float) extends Serializable
       val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF()
       val als = new ALS
       val paramMap = ParamMap(als.alpha -> 25000).
       put(als.checkpointInterval, 5).
       put(als.implicitPrefs, true).
       put(als.itemCol, "item").
       put(als.maxIter, 60).
       put(als.nonnegative, false).
       put(als.numItemBlocks, 600).
       put(als.numUserBlocks, 600).
       put(als.regParam, 4.5).
       put(als.rank, 25).
       put(als.userCol, "user")
       als.fit(ratingData, paramMap)

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            viirya L. C. Hsieh
            JerryHouse Qiang Wang
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment