Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-29818 Missing persist on RDD
  3. SPARK-29817

Missing persist on docs in mllib.clustering.LDAOptimizer.initialize

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.4.3
    • None
    • MLlib
    • None

    Description

      The rdd docs in mllib.clustering.LDAOptimizer is used in two actions: verticesTMP.reduceByKey, and docs.take(1). It should be persisted.

        override private[clustering] def initialize(
            docs: RDD[(Long, Vector)],
            lda: LDA): EMLDAOptimizer = {
            ...
          val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
            // Add edges for terms with non-zero counts.
            termCounts.asBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
              Edge(docID, term2index(term), cnt)
            }
          }
          // Create vertices.
          // Initially, we use random soft assignments of tokens to topics (random gamma).
          val docTermVertices: RDD[(VertexId, TopicCounts)] = {
            val verticesTMP: RDD[(VertexId, TopicCounts)] =
              edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
                val random = new Random(partIndex + randomSeed)
                partEdges.flatMap { edge =>
                  val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0)
                  val sum = gamma * edge.attr
                  Seq((edge.srcId, sum), (edge.dstId, sum))
                }
              }
            verticesTMP.reduceByKey(_ + _) // RDD dependency: verticesTMP - edges - docs. First use docs
          }
          // Partition such that edges are grouped by document
          this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
          this.k = k
          this.vocabSize = docs.take(1).head._2.size // Second use docs
      

      This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              spark_cachecheck IcySanwitch
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: