Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-14804

Graph vertexRDD/EdgeRDD checkpoint results ClassCastException:

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.6.1
    • Fix Version/s: 2.0.3, 2.1.1, 2.2.0
    • Component/s: GraphX
    • Labels:
      None

      Description

          graph3.vertices.checkpoint()
          graph3.vertices.count()
          graph3.vertices.map(_._2).count()
      

      16/04/21 21:04:43 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 (TID 13, localhost): java.lang.ClassCastException: org.apache.spark.graphx.impl.ShippableVertexPartition cannot be cast to scala.Tuple2
      at com.xiaomi.infra.codelab.spark.Graph2$$anonfun$main$1.apply(Graph2.scala:80)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
      at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
      at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1161)
      at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1161)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      at org.apache.spark.scheduler.Task.run(Task.scala:91)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:219)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

      look at the code:

        private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
        {
          if (isCheckpointedAndMaterialized) {
            firstParent[T].iterator(split, context)
          } else {
            compute(split, context)
          }
        }
      
       private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
      
       override def isCheckpointed: Boolean = {
         firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
       }
      
      

      for VertexRDD or EdgeRDD, first parent is its partitionRDD RDD[ShippableVertexPartition[VD]]/RDD[(PartitionID, EdgePartition[ED, VD])]

      1. we call vertexRDD.checkpoint, it partitionRDD will checkpoint, so VertexRDD.isCheckpointedAndMaterialized=true.
      2. then we call vertexRDD.iterator, because checkoint=true it called firstParent.iterator(which is not CheckpointRDD, actually is partitionRDD).

      so returned iterator is iterator[ShippableVertexPartition] not expect iterator[(VertexId, VD)]]

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tdas Tathagata Das
                Reporter:
                SuYan SuYan
              • Votes:
                1 Vote for this issue
                Watchers:
                9 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: