Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.4.3
-
None
-
None
Description
The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: verticesTMP.reduceByKey, and docs.take(1). It should be persisted.
override private[clustering] def initialize( docs: RDD[(Long, Vector)], lda: LDA): EMLDAOptimizer = { ... val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) => // Add edges for terms with non-zero counts. termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) => Edge(docID, term2index(term), cnt) } } // Create vertices. // Initially, we use random soft assignments of tokens to topics (random gamma). val docTermVertices: RDD[(VertexId, TopicCounts)] = { val verticesTMP: RDD[(VertexId, TopicCounts)] = edges.mapPartitionsWithIndex { case (partIndex, partEdges) => val random = new Random(partIndex + randomSeed) partEdges.flatMap { edge => val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0) val sum = gamma * edge.attr Seq((edge.srcId, sum), (edge.dstId, sum)) } } verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - docs. First use docs } // Partition such that edges are grouped by document this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k this.vocabSize = docs.take(1).head._2.size // Second use docs
This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
Attachments
Issue Links
- duplicates
-
SPARK-29818 Missing persist on RDD
- Resolved
- links to