Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13048

EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.5.2
    • 2.0.0
    • MLlib
    • None
    • Standalone Spark cluster

    Description

      In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel.

      The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios:

      • The graph doesn't fit in memory on the worker nodes (e.g. very large data set).
      • Late worker failures that require reading the now-dependent checkpoint.

      I ran into this problem running a 10M record LDA model in a memory starved environment. The model consistently failed in either the collect at LDAModel.scala:528 stage (when converting to a LocalLDAModel) or in the reduce at LDAModel.scala:563 stage (when calling "describeTopics" on the model). In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file.

      I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves.

      java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
      

      Relevant code is included below.

      LDAOptimizer.scala:

        override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
          require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
          this.graphCheckpointer.deleteAllCheckpoints()
          // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in
          // LDAModel.toLocal conversion
          new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize,
            Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration,
            iterationTimes)
        }
      

      PeriodicCheckpointer.scala

        /**
         * Call this at the end to delete any remaining checkpoint files.
         */
        def deleteAllCheckpoints(): Unit = {
          while (checkpointQueue.nonEmpty) {
            removeCheckpointFile()
          }
        }
      
        /**
         * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
         * This prints a warning but does not fail if the files cannot be removed.
         */
        private def removeCheckpointFile(): Unit = {
          val old = checkpointQueue.dequeue()
          // Since the old checkpoint is not deleted by Spark, we manually delete it.
          val fs = FileSystem.get(sc.hadoopConfiguration)
          getCheckpointFiles(old).foreach { checkpointFile =>
            try {
              fs.delete(new Path(checkpointFile), true)
            } catch {
              case e: Exception =>
                logWarning("PeriodicCheckpointer could not remove old checkpoint file: " +
                  checkpointFile)
            }
          }
        }
      

      Attachments

        Issue Links

          Activity

            People

              josephkb Joseph K. Bradley
              jvstein Jeff Stein
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: