Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29872

Improper cache strategy in examples

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Won't Fix
    • Affects Version/s: 3.0.0
    • Fix Version/s: None
    • Component/s: Examples
    • Labels:
      None

      Description

      1. Improper cache in examples.SparkTC
      The RDD edges should be cached because it is used multiple times in while loop. And it should be unpersisted before the last action tc.count(), because tc has been persisted.
      On the other hand, many tc objects is cached in while loop but never uncached, which will waste memory.

          val edges = tc.map(x => (x._2, x._1)) // Edges should be cached
          // This join is iterated until a fixed point is reached.
          var oldCount = 0L
          var nextCount = tc.count()
          do { 
            oldCount = nextCount
            // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
            // then project the result to obtain the new (x, z) paths.
            tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache()
            nextCount = tc.count()
          } while (nextCount != oldCount)
          println(s"TC has ${tc.count()} edges.")
      

      2. Cache needed in examples.ml.LogisticRegressionSummary
      The DataFrame fMeasure should be cached.

          // Set the model threshold to maximize F-Measure
          val fMeasure = trainingSummary.fMeasureByThreshold // fMeasures should be cached
          val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
          val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
            .select("threshold").head().getDouble(0)
          lrModel.setThreshold(bestThreshold)
      

      3. Cache needed in examples.sql.SparkSQLExample

          val peopleDF = spark.sparkContext
            .textFile("examples/src/main/resources/people.txt")
            .map(_.split(","))
            .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) // This RDD should be cahced
            .toDF()
          // Register the DataFrame as a temporary view
          peopleDF.createOrReplaceTempView("people")
          val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
          teenagersDF.map(teenager => "Name: " + teenager(0)).show()
          teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
          implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
          teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
      

      This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                spark_cachecheck IcySanwitch
              • Votes:
                0 Vote for this issue
                Watchers:
                1 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: