Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22184

GraphX fails in case of insufficient memory and checkpoints enabled

    Details

    • Type: Bug
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.2.0
    • Fix Version/s: None
    • Component/s: GraphX
    • Labels:
      None
    • Environment:

      spark 2.2.0
      scala 2.11

      Description

      GraphX fails with FileNotFoundException in case of insufficient memory when checkpoints are enabled.

      Here is the stacktrace

      Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-00000 does not exist
      java.io.FileNotFoundException: File file:/tmp/spark-90119695-a126-47b5-b047-d656fee10c17/9b16e2a9-6c80-45eb-8736-bbb6eb840146/rdd-28/part-00000 does not exist
      	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
      	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
      	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
      	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
      	at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
      	at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
      	at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
      	at scala.Option.map(Option.scala:146)
      	at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
      ...
      

      As GraphX uses cached RDDs intensively, the issue is only reproducible when previously cached and checkpointed Vertex and Edge RDDs are evicted from memory and forced to be read from disk.

      For testing purposes the following parameters may be set to emulate low memory environment

      val sparkConf = new SparkConf()
        .set("spark.graphx.pregel.checkpointInterval", "2")
        // set testing memory to evict cached RDDs from it and force
        // reading checkpointed RDDs from disk
        .set("spark.testing.reservedMemory", "128")
        .set("spark.testing.memory", "256")
      

      This issue also includes SPARK-22150 and cannot be fixed until SPARK-22150 is fixed too.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                szhemzhitsky Sergey Zhemzhitsky
              • Votes:
                2 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated: