Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-7002

Persist on RDD fails the second time if the action is called on a child RDD without showing a FAILED message

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 1.3.0
    • None
    • Spark Core
    • Platform: Power8
      OS: Ubuntu 14.10
      Java: java-8-openjdk-ppc64el

    Description

      The major issue is: Persist on RDD fails the second time if the action is called on a child RDD without showing a FAILED message. This is pointed out at 2)
      next to this:
      toDebugString on a child RDD does not show that the parent RDD is [Disk Serialized 1x Replicated]. This is pointed out at 1)

      Note: I am persisting to disk (DISK_ONLY) to validate that the RDD is or is not physically stored, as I did not want to solely rely on a missing line in .toDebugString (see comments in trace)

      scala> val rdd1 = sc.parallelize(List(1,2,3))
      scala> val rdd2 = rdd1.map(x => (x,x+1))
      scala> val rdd3 = rdd2.reduceByKey( (x,y) => x+y)
      scala> import org.apache.spark.storage.StorageLevel
      scala> rdd2.persist(StorageLevel.DISK_ONLY)
      scala> rdd3.collect()
      scala> rdd2.toDebugString
      res4: String = 
      (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
        \|        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B
        \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
      scala> rdd3.toDebugString
      res5: String = 
      (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
        +-(100) MapPartitionsRDD[1] at map at <console>:23 []
            \|       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B
            \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
      // 1) rdd3 does not show that the other RDD's are [Disk Serialized 1x Replicated], but the data is on disk. This is verified by
      // a) The line starting with CachedPartitions
      // b) a find in spark_local_dir: "find . -name "\*"  \| grep rdd" returns "./spark-b39bcf9b-e7d7-4284-bdd2-1be7ac3cacef/blockmgr-4f4c0b1c-b47a-4972-b364-7179ea6e0873/1f/rdd_4_*", where * are the number of partitions
      scala> rdd2.unpersist()
      scala> rdd2.toDebugString
      res8: String = 
      (100) MapPartitionsRDD[1] at map at <console>:23 []
        \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
      scala> rdd3.toDebugString
      res9: String = 
      (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
        +-(100) MapPartitionsRDD[1] at map at <console>:23 []
            \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
      // successfully unpersisted, also not visible on disk
      scala> rdd2.persist(StorageLevel.DISK_ONLY)
      scala> rdd3.collect()
      scala> rdd2.toDebugString
      res18: String = 
      (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
        \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
      scala> rdd3.toDebugString
      res19: String = 
      (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
        +-(100) MapPartitionsRDD[1] at map at <console>:23 []
            \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
      // 2) The data is not visible on disk though the find command previously mentioned, and is also not mentioned in the toDebugString (no line starting with CachedPartitions, even though  [Disk Serialized 1x Replicated] is mentioned). It does work when you call the action on the actual RDD:
      scala> rdd2.collect()
      scala> rdd2.toDebugString
      res21: String = 
      (100) MapPartitionsRDD[1] at map at <console>:23 [Disk Serialized 1x Replicated]
        \|        CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B
        \|   ParallelCollectionRDD[0] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
      scala> rdd3.toDebugString
      res22: String = 
      (100) ShuffledRDD[2] at reduceByKey at <console>:25 []
        +-(100) MapPartitionsRDD[1] at map at <console>:23 []
            \|       CachedPartitions: 100; MemorySize: 0.0 B; TachyonSize: 0.0 B; DiskSize: 802.0 B
            \|   ParallelCollectionRDD[0] at parallelize at <console>:21 []
      // Data appears on disk again (using find command preciously mentioned), and line with CachedPartitions is back in the .toDebugString
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            thubregtsen Tom Hubregtsen
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: