Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-14804

Graph vertexRDD/EdgeRDD checkpoint results ClassCastException:

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 1.6.1
    • 2.0.3, 2.1.1, 2.2.0
    • GraphX
    • None

    Description

          graph3.vertices.checkpoint()
          graph3.vertices.count()
          graph3.vertices.map(_._2).count()
      

      16/04/21 21:04:43 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 (TID 13, localhost): java.lang.ClassCastException: org.apache.spark.graphx.impl.ShippableVertexPartition cannot be cast to scala.Tuple2
      at com.xiaomi.infra.codelab.spark.Graph2$$anonfun$main$1.apply(Graph2.scala:80)
      at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
      at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
      at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1161)
      at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1161)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1863)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      at org.apache.spark.scheduler.Task.run(Task.scala:91)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:219)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      at java.lang.Thread.run(Thread.java:745)

      look at the code:

        private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
        {
          if (isCheckpointedAndMaterialized) {
            firstParent[T].iterator(split, context)
          } else {
            compute(split, context)
          }
        }
      
       private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
      
       override def isCheckpointed: Boolean = {
         firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
       }
      
      

      for VertexRDD or EdgeRDD, first parent is its partitionRDD RDD[ShippableVertexPartition[VD]]/RDD[(PartitionID, EdgePartition[ED, VD])]

      1. we call vertexRDD.checkpoint, it partitionRDD will checkpoint, so VertexRDD.isCheckpointedAndMaterialized=true.
      2. then we call vertexRDD.iterator, because checkoint=true it called firstParent.iterator(which is not CheckpointRDD, actually is partitionRDD).

      so returned iterator is iterator[ShippableVertexPartition] not expect iterator[(VertexId, VD)]]

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            tdas Tathagata Das
            SuYan SuYan
            Votes:
            1 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment