Details
Description
I'm able to reproduce the error consistently with a 2000 record text file with each record having 1-5 terms and checkpointing enabled. It looks like the problem was introduced with the resolution for SPARK-13355.
The EdgeRDD class seems to be lying about it's type in a way that causes RDD.mapPartitionsWithIndex method to be unusable when it's referenced as an RDD of Edge elements.
val spark = SparkSession.builder.appName("lda").getOrCreate() spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoints") val data: RDD[(Long, Vector)] = // snip data.setName("data").cache() val lda = new LDA val optimizer = new EMLDAOptimizer lda.setOptimizer(optimizer) .setK(10) .setMaxIterations(400) .setAlpha(-1) .setBeta(-1) .setCheckpointInterval(7) val ldaModel = lda.run(data)
16/10/16 23:53:54 WARN TaskSetManager: Lost task 3.0 in stage 348.0 (TID 1225, server2.domain): java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.spark.graphx.Edge at org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.apply(EdgeRDD.scala:107) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:107) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:105) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:820) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332) at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:50) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722)
Attachments
Attachments
Issue Links
- Is contained by
-
SPARK-14804 Graph vertexRDD/EdgeRDD checkpoint results ClassCastException:
- Resolved
- is related to
-
SPARK-17265 EdgeRDD Difference throws an exception
- Resolved
- relates to
-
SPARK-14804 Graph vertexRDD/EdgeRDD checkpoint results ClassCastException:
- Resolved
- links to