Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • None
    • None
    • ML

    Description

      We need to explore how to cache DataFrames during the execution of Pipelines. It's a hard problem in general to handle automatically or manually, so we should start with some design discussions about:

      • How to control it manually
      • Whether & how to handle it automatically
      • API changes needed for each

      Attachments

        Activity

          Hi, I'm curious to know how to achieve caching as of today with ml pipelines ?

          matthieu Matthieu Baechler added a comment - Hi, I'm curious to know how to achieve caching as of today with ml pipelines ?
          sachintyagi22 Sachin Tyagi added a comment -

          Hi, I want to take a stab at it. Here's how I am trying to approach it.

          A PipelineStage can be marked to persist its output DataFrame by calling a persist(storageLevel, columnExprs). This will result in two cases:

          • For Transformers – their output DF should be marked to persist.
          • For Estimators – the output of their models should be marked to persist.

          For example,

          Example.scala
          val tokenizer = ...
          // CountVectorizer estimator's Model should persist its output DF (and only those columns passed in args to persist) so that the LDA iterations can run on the smaller persisted dataframe.
          val countVectorizer = new CountVectorizer()
                .setInputCol("words")
                .setOutputCol("features")
                .setVocabSize(1000)
                .persist(StorageLevel.MEMORY_AND_DISK, "doc_id", "features")
          val lda = ...
          val pipelineModel =  new Pipeline().setStages(Array(tokenizer, countVectorizer, lda))
          

          Also, there should be a way use the fitted pipeline to transform an already persisted dataframe if that dataframe was persisted as part of some stage during fit(). Else we end up doing unnecessary work in some cases (tokenizing and countvectorizing the input dataframe again to get topic distributions, in above example).

          Instead in such a case, only the necessary stages should be invoked to transform.

          Continue.scala
          // The pipeline model should be able to identify whether the passed DF was persisted as part of some stage and then run only needed stages. In this case, the model should run only the LDA stage.
          pipelineModel.transform(countVectorizer.getCacheDF())
          
          // This should run all stages
          pipelineModel.tranform(unpersistedDF)
          

          In my mind, this can be achieved by modifying the PipelineStage, Pipeline and PipelineModel classes. Specifically, their transform and transformSchema methods. And obviously, by creating the appropriate persist() method(s) on PipelineStage.

          Please let me know your comments on this approach. Specifically, if you see any issues or things that need to be taken care of. I can submitted a PR soon to see how it looks.

          sachintyagi22 Sachin Tyagi added a comment - Hi, I want to take a stab at it. Here's how I am trying to approach it. A PipelineStage can be marked to persist its output DataFrame by calling a persist(storageLevel, columnExprs). This will result in two cases: For Transformers – their output DF should be marked to persist. For Estimators – the output of their models should be marked to persist. For example, Example.scala val tokenizer = ... // CountVectorizer estimator's Model should persist its output DF (and only those columns passed in args to persist) so that the LDA iterations can run on the smaller persisted dataframe. val countVectorizer = new CountVectorizer() .setInputCol( "words" ) .setOutputCol( "features" ) .setVocabSize(1000) .persist(StorageLevel.MEMORY_AND_DISK, "doc_id" , "features" ) val lda = ... val pipelineModel = new Pipeline().setStages(Array(tokenizer, countVectorizer, lda)) Also, there should be a way use the fitted pipeline to transform an already persisted dataframe if that dataframe was persisted as part of some stage during fit(). Else we end up doing unnecessary work in some cases (tokenizing and countvectorizing the input dataframe again to get topic distributions, in above example). Instead in such a case, only the necessary stages should be invoked to transform. Continue.scala // The pipeline model should be able to identify whether the passed DF was persisted as part of some stage and then run only needed stages. In this case , the model should run only the LDA stage. pipelineModel.transform(countVectorizer.getCacheDF()) // This should run all stages pipelineModel.tranform(unpersistedDF) In my mind, this can be achieved by modifying the PipelineStage, Pipeline and PipelineModel classes. Specifically, their transform and transformSchema methods. And obviously, by creating the appropriate persist() method(s) on PipelineStage. Please let me know your comments on this approach. Specifically, if you see any issues or things that need to be taken care of. I can submitted a PR soon to see how it looks.
          apachespark Apache Spark added a comment -

          User 'sachintyagi22' has created a pull request for this issue:
          https://github.com/apache/spark/pull/17332

          apachespark Apache Spark added a comment - User 'sachintyagi22' has created a pull request for this issue: https://github.com/apache/spark/pull/17332

          People

            Unassigned Unassigned
            josephkb Joseph K. Bradley
            Votes:
            1 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: