Hi, I want to take a stab at it. Here's how I am trying to approach it.
A PipelineStage can be marked to persist its output DataFrame by calling a persist(storageLevel, columnExprs). This will result in two cases:
- For Transformers – their output DF should be marked to persist.
- For Estimators – the output of their models should be marked to persist.
For example,
val tokenizer = ...
val countVectorizer = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(1000)
.persist(StorageLevel.MEMORY_AND_DISK, "doc_id", "features")
val lda = ...
val pipelineModel = new Pipeline().setStages(Array(tokenizer, countVectorizer, lda))
Also, there should be a way use the fitted pipeline to transform an already persisted dataframe if that dataframe was persisted as part of some stage during fit(). Else we end up doing unnecessary work in some cases (tokenizing and countvectorizing the input dataframe again to get topic distributions, in above example).
Instead in such a case, only the necessary stages should be invoked to transform.
pipelineModel.transform(countVectorizer.getCacheDF())
pipelineModel.tranform(unpersistedDF)
In my mind, this can be achieved by modifying the PipelineStage, Pipeline and PipelineModel classes. Specifically, their transform and transformSchema methods. And obviously, by creating the appropriate persist() method(s) on PipelineStage.
Please let me know your comments on this approach. Specifically, if you see any issues or things that need to be taken care of. I can submitted a PR soon to see how it looks.
Hi, I'm curious to know how to achieve caching as of today with ml pipelines ?