Details
Description
In EMLDAOptimizer, all checkpoints are deleted before returning the DistributedLDAModel.
The most recent checkpoint is still necessary for operations on the DistributedLDAModel under a couple scenarios:
- The graph doesn't fit in memory on the worker nodes (e.g. very large data set).
- Late worker failures that require reading the now-dependent checkpoint.
I ran into this problem running a 10M record LDA model in a memory starved environment. The model consistently failed in either the collect at LDAModel.scala:528 stage (when converting to a LocalLDAModel) or in the reduce at LDAModel.scala:563 stage (when calling "describeTopics" on the model). In both cases, a FileNotFoundException is thrown attempting to access a checkpoint file.
I'm not sure what the correct fix is here; it might involve a class signature change. An alternative simple fix is to leave the last checkpoint around and expect the user to clean the checkpoint directory themselves.
java.io.FileNotFoundException: File does not exist: /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
Relevant code is included below.
LDAOptimizer.scala:
override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() // The constructor's default arguments assume gammaShape = 100 to ensure equivalence in // LDAModel.toLocal conversion new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, this.vocabSize, Vectors.dense(Array.fill(this.k)(this.docConcentration)), this.topicConcentration, iterationTimes) }
PeriodicCheckpointer.scala
/** * Call this at the end to delete any remaining checkpoint files. */ def deleteAllCheckpoints(): Unit = { while (checkpointQueue.nonEmpty) { removeCheckpointFile() } } /** * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files. * This prints a warning but does not fail if the files cannot be removed. */ private def removeCheckpointFile(): Unit = { val old = checkpointQueue.dequeue() // Since the old checkpoint is not deleted by Spark, we manually delete it. val fs = FileSystem.get(sc.hadoopConfiguration) getCheckpointFiles(old).foreach { checkpointFile => try { fs.delete(new Path(checkpointFile), true) } catch { case e: Exception => logWarning("PeriodicCheckpointer could not remove old checkpoint file: " + checkpointFile) } } }
Attachments
Issue Links
- relates to
-
SPARK-14420 keepLastCheckpoint Param for Python LDA with EM
- Closed
- links to