Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29818 Missing persist on RDD
  3. SPARK-29817

Missing persist on docs in mllib.clustering.LDAOptimizer.initialize

Attach filesAttach ScreenshotVotersWatch issueWatchersLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 2.4.3
    • Fix Version/s: None
    • Component/s: MLlib
    • Labels:
      None

      Description

      The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: verticesTMP.reduceByKey, and docs.take(1). It should be persisted.

        override private[clustering] def initialize(
            docs: RDD[(Long, Vector)],
            lda: LDA): EMLDAOptimizer = {
            ...
          val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
            // Add edges for terms with non-zero counts.
            termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
              Edge(docID, term2index(term), cnt)
            }
          }
          // Create vertices.
          // Initially, we use random soft assignments of tokens to topics (random gamma).
          val docTermVertices: RDD[(VertexId, TopicCounts)] = {
            val verticesTMP: RDD[(VertexId, TopicCounts)] =
              edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
                val random = new Random(partIndex + randomSeed)
                partEdges.flatMap { edge =>
                  val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
                  val sum = gamma * edge.attr
                  Seq((edge.srcId, sum), (edge.dstId, sum))
                }
              }
            verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - docs. First use docs
          }
          // Partition such that edges are grouped by document
          this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
          this.k = k
          this.vocabSize = docs.take(1).head._2.size // Second use docs
      

      This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.

        Attachments

        Issue Links

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              spark_cachecheck IcySanwitch

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment