Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.9.0
Description
I get
When running the following code:
import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // Load and parse the data val data = sc.textFile("mllib/data/als/test.data") val ratings = data.map(_.split(',') match { case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val numIterations = 20 val model = ALS.train(ratings, 1, 20, 0.01) // Evaluate the model on rating data val ratesAndPreds = ratings.map{ case Rating(user, item, rate) => (rate, model.predict(user, item))} val MSE = ratesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/ratesAndPreds.count
I get:
org.apache.spark.SparkException: Job aborted: Task 2.0:0 failed 1 times (most recent failure: Exception failure: scala.MatchError: null) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1024) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1024) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:617) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:617) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:617) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:205) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
The problem is deterministic. In addition ratesAndPreds has exactly 16 elements:
scala> ratesAndPreds.take(16) res1: Array[(Double, Double)] = Array((5.0,2.9995813468435633), (1.0,2.999581386744982), (5.0,2.9995813468435633), (1.0,2.999581386744982), (5.0,2.9995813468435633), (1.0,2.999581386744982), (5.0,2.9995813468435633), (1.0,2.999581386744982), (1.0,2.999581373444509), (5.0,2.999581413345928), (1.0,2.999581373444509), (5.0,2.999581413345928), (1.0,2.999581373444509), (5.0,2.999581413345928), (1.0,2.999581373444509), (5.0,2.999581413345928)) scala> ratesAndPreds.count()
Throws the exception again.