Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Won't Fix
-
3.0.0
-
None
-
None
Description
1. Improper cache in examples.SparkTC
The RDD edges should be cached because it is used multiple times in while loop. And it should be unpersisted before the last action tc.count(), because tc has been persisted.
On the other hand, many tc objects is cached in while loop but never uncached, which will waste memory.
val edges = tc.map(x => (x._2, x._1)) // Edges should be cached // This join is iterated until a fixed point is reached. var oldCount = 0L var nextCount = tc.count() do { oldCount = nextCount // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache() nextCount = tc.count() } while (nextCount != oldCount) println(s"TC has ${tc.count()} edges.")
2. Cache needed in examples.ml.LogisticRegressionSummary
The DataFrame fMeasure should be cached.
// Set the model threshold to maximize F-Measure val fMeasure = trainingSummary.fMeasureByThreshold // fMeasures should be cached val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0) val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure) .select("threshold").head().getDouble(0) lrModel.setThreshold(bestThreshold)
3. Cache needed in examples.sql.SparkSQLExample
val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) // This RDD should be cahced .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") teenagersDF.map(teenager => "Name: " + teenager(0)).show() teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
Attachments
Issue Links
- is duplicated by
-
SPARK-29856 Conditional unnecessary persist on RDDs in ML algorithms
- Resolved
-
SPARK-29878 Improper cache strategies in GraphX
- Resolved
- links to