Uploaded image for project: 'Solr'
  1. Solr
  2. SOLR-9258

Optimizing, storing and deploying AI models with Streaming Expressions

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 6.3
    • Component/s: None
    • Security Level: Public (Default Security Level. Issues are Public)
    • Labels:
      None

      Description

      This ticket describes a framework for optimizing, storing and deploying AI models within the Streaming Expression framework.

      Optimizing
      Cao Manh Dat, has contributed SOLR-9252 which provides Streaming Expressions for both feature selection and optimization of a logistic regression text classifier. SOLR-9252 also provides a great working example of optimization of a machine learning model using an in-place parallel iterative algorithm.

      Storing

      Both features and optimized models can be stored in SolrCloud collections using the update expression. Using Cao Manh Dat's example in SOLR-9252, the pseudo code for storing features would be:

      update(featuresCollection, 
             featuresSelection(collection1, 
                                  id="myFeatures", 
                                  q="*:*",  
                                  field="tv_text", 
                                  outcome="out_i", 
                                  positiveLabel=1, 
                                  numTerms=100))
      

      The id field can be added to the featureSelection expression so that features can be later retrieved from the collection it's stored in.

      Deploying

      With the introduction of the topic() expression, SolrCloud can be treated as a distributed message queue. This messaging capability can be used to deploy models and process data through the models.

      To implement this approach a classify() function can be created that uses a topic() function to return both the model and the data to be classified:

      The pseudo code looks like this:

      classify(topic(models, q="modelID", fl="features, weights"),
               topic(emails, q="*:*", fl="id, body", rows="500", version="3232323"))
      

      In the example above the classify() function uses the topic() function to retrieve the model. Each time there is an update to the model in the index, the topic() expression will automatically read the new model.

      The topic function() is also used to pull in the data set that is being classified. Notice the version parameter. This will be added to the topic function to support pulling results from a specific version number (jira ticket to follow).

      With this approach both the model and the data to process through the model are treated as messages in a message queue.

      The daemon function can be used to send the classify function to Solr where it will be run in the background. The pseudo code looks like this:

      daemon(...,
               update(classifiedEmails, 
                       classify(topic(models, q="modelID", fl="features, weights"),
                                topic(emails, q="*:*", fl="id, fl, body", rows="500", version="3232323"))))
      

      In this scenario the daemon will run the classify function repeatedly in the background. With each run the topic() functions will re-pull the model if the model has been updated. It will also pull a new set of emails to be classified. The classified emails can be stored in another SolrCloud collection using the update() function.

      Using this approach emails can be classified in batches. The daemon can continue to run even after all all the emails have been classified. New emails added to the emails collections will then be automatically classified when they enter the index.

      Classification can be done in parallel once SOLR-9240 is completed. This will allow topic() results to be partitioned across worker nodes so they can be processed in parallel. The pseudo code for this is:

      parallel(workerCollection, worker="20", ...,
               daemon(...,
                         update(classifiedEmails, 
                                 classify(topic(models, q="modelID", fl="features, weights", partitionKeys="none"),
                                          topic(emails, q="*:*", fl="id, fl, body", rows="500", version="3232323", partitionKeys="id")))))
      

      The code above sends a daemon to 20 workers, which will each classify a partition of records pulled by the topic() function.

      AI based alerting

      If the version parameter is not supplied to the topic stream it will stream only new content from the topic, rather then starting from an older version number.

      In this scenario the topic function behaves like an alert. Pseudo code for alerts look like this:

      daemon(...,
               alert(..., 
                   classify(topic(models, q="modelID", fl="features, weights"),
                            topic(emails, q="*:*", fl="id, fl, body", rows="500"))))
      

      In the example above an alert() function wraps the classify() function and takes actions based on the classification of documents. Developers can build there own alert functions using the Streaming API and plug them in to provide custom actions.

      1. SOLR-9258.patch
        25 kB
        Cao Manh Dat
      2. ModelCache.java
        4 kB
        Joel Bernstein
      3. ModelCache.java
        4 kB
        Joel Bernstein
      4. SOLR-9258.patch
        34 kB
        Joel Bernstein
      5. SOLR-9258.patch
        34 kB
        Joel Bernstein
      6. SOLR-9258.patch
        35 kB
        Joel Bernstein
      7. SOLR-9258.patch
        35 kB
        Joel Bernstein
      8. SOLR-9258.patch
        35 kB
        Joel Bernstein

        Issue Links

          Activity

          Hide
          shalinmangar Shalin Shekhar Mangar added a comment -

          Closing after 6.3.0 release.

          Show
          shalinmangar Shalin Shekhar Mangar added a comment - Closing after 6.3.0 release.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Cao Manh Dat, thanks for all your work on this great ticket!

          Show
          joel.bernstein Joel Bernstein added a comment - Cao Manh Dat , thanks for all your work on this great ticket!
          Hide
          caomanhdat Cao Manh Dat added a comment -

          Thanks Joel Bernstein for your hard work on this ticket.

          Show
          caomanhdat Cao Manh Dat added a comment - Thanks Joel Bernstein for your hard work on this ticket.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 5adb8f1bd5905f6749e57b7e27d467a4f36c56b2 in lucene-solr's branch refs/heads/branch_6x from Joel Bernstein
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=5adb8f1 ]

          SOLR-9258: Update CHANGES.txt

          Show
          jira-bot ASF subversion and git services added a comment - Commit 5adb8f1bd5905f6749e57b7e27d467a4f36c56b2 in lucene-solr's branch refs/heads/branch_6x from Joel Bernstein [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=5adb8f1 ] SOLR-9258 : Update CHANGES.txt
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 787d905edcf813f2e02155aabcc0c1dd25509b21 in lucene-solr's branch refs/heads/master from Joel Bernstein
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=787d905 ]

          SOLR-9258: Update CHANGES.txt

          Show
          jira-bot ASF subversion and git services added a comment - Commit 787d905edcf813f2e02155aabcc0c1dd25509b21 in lucene-solr's branch refs/heads/master from Joel Bernstein [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=787d905 ] SOLR-9258 : Update CHANGES.txt
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 568b54687a938ed0f6cd8b29100eda2c0b547975 in lucene-solr's branch refs/heads/branch_6x from Joel Bernstein
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=568b546 ]

          SOLR-9258: Fix precommit

          Show
          jira-bot ASF subversion and git services added a comment - Commit 568b54687a938ed0f6cd8b29100eda2c0b547975 in lucene-solr's branch refs/heads/branch_6x from Joel Bernstein [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=568b546 ] SOLR-9258 : Fix precommit
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 8f00bcb1a0d88a6898e3ae6b8749610b2bd47d3c in lucene-solr's branch refs/heads/master from Joel Bernstein
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=8f00bcb ]

          SOLR-9258: Fix precommit

          Show
          jira-bot ASF subversion and git services added a comment - Commit 8f00bcb1a0d88a6898e3ae6b8749610b2bd47d3c in lucene-solr's branch refs/heads/master from Joel Bernstein [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=8f00bcb ] SOLR-9258 : Fix precommit
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 9cd6437d4b21dd6d9c16688eedb5af012ea67e86 in lucene-solr's branch refs/heads/master from Joel Bernstein
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=9cd6437 ]

          SOLR-9258: Optimizing, storing and deploying AI models with Streaming Expressions

          Show
          jira-bot ASF subversion and git services added a comment - Commit 9cd6437d4b21dd6d9c16688eedb5af012ea67e86 in lucene-solr's branch refs/heads/master from Joel Bernstein [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=9cd6437 ] SOLR-9258 : Optimizing, storing and deploying AI models with Streaming Expressions
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch with a parallel classification test and some parameter clarification

          Show
          joel.bernstein Joel Bernstein added a comment - New patch with a parallel classification test and some parameter clarification
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch which adds the cacheMillis param to the model() expression

          Show
          joel.bernstein Joel Bernstein added a comment - New patch which adds the cacheMillis param to the model() expression
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch with passing StreamExpressionTest for the Classify function. Also exercises the model function,

          I'll add some code to test the model caching behavior shortly.

          Show
          joel.bernstein Joel Bernstein added a comment - New patch with passing StreamExpressionTest for the Classify function. Also exercises the model function, I'll add some code to test the model caching behavior shortly.
          Hide
          caomanhdat Cao Manh Dat added a comment -

          +1 The patch look great.

          Show
          caomanhdat Cao Manh Dat added a comment - +1 The patch look great.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch which adds the ModelCache, ModelStream and breaks out the ClassifyStream into it's own class. The test case has been adjusted slightly to accommodate the new classes.

          The core algorithms though are the same as the original patch

          I haven't actually run this code yet so this is just for review.

          Show
          joel.bernstein Joel Bernstein added a comment - New patch which adds the ModelCache, ModelStream and breaks out the ClassifyStream into it's own class. The test case has been adjusted slightly to accommodate the new classes. The core algorithms though are the same as the original patch I haven't actually run this code yet so this is just for review.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Ah I see the problem now.

          Ok, it sounds like any Stream that uses analyzers will be tied to Solr core. This is OK, because the main use case is to run the expressions through StreamHandler.

          Maybe we need a new package in Solr core for streams that rely on Solr core classes. I'll put some thought into this.

          I'll keep working with the patch.

          Show
          joel.bernstein Joel Bernstein added a comment - Ah I see the problem now. Ok, it sounds like any Stream that uses analyzers will be tied to Solr core. This is OK, because the main use case is to run the expressions through StreamHandler. Maybe we need a new package in Solr core for streams that rely on Solr core classes. I'll put some thought into this. I'll keep working with the patch.
          Hide
          caomanhdat Cao Manh Dat added a comment -

          We cant put ClassifyStream inside sorlj.io, because solrj module is not dependent on solr-core or lucene-core, so we cant access Analyzer or SolrCore from ClassifyStream.
          I also cant find any package inside solr-core that appropriate for this class. So I make ClassifyStream as an inner class of the StreamHandler? (Hint a welcome )

          Show
          caomanhdat Cao Manh Dat added a comment - We cant put ClassifyStream inside sorlj.io, because solrj module is not dependent on solr-core or lucene-core, so we cant access Analyzer or SolrCore from ClassifyStream. I also cant find any package inside solr-core that appropriate for this class. So I make ClassifyStream as an inner class of the StreamHandler? (Hint a welcome )
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New version of the ModelCache that doesn't synchronize on code with blocking io. This is looking much better.

          Show
          joel.bernstein Joel Bernstein added a comment - New version of the ModelCache that doesn't synchronize on code with blocking io. This is looking much better.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          The more I look at the ModelCache the less I like it. My main concern is the synchronization which could become a bottleneck.

          Looking into other caching approaches...

          Show
          joel.bernstein Joel Bernstein added a comment - The more I look at the ModelCache the less I like it. My main concern is the synchronization which could become a bottleneck. Looking into other caching approaches...
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Attaching ModelCache.java for review. This should be a very efficient cache for models. It checks for Models at intervals using a TopicStream. If no changes have occurred to the model it uses the cached version.

          Show
          joel.bernstein Joel Bernstein added a comment - Attaching ModelCache.java for review. This should be a very efficient cache for models. It checks for Models at intervals using a TopicStream . If no changes have occurred to the model it uses the cached version.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Aside from the inner class issue the ClassifyStream is looking very good.

          One thing we should talk about is the model streaming. I think it makes sense to pull the model in the Classify.open() method. I think it also makes sense to have a specific ModelStream implementation that has this behavior:

          1) Checks a local cache of the models to see if the model is in memory. The cache can be a simple LRUCache.
          2) If the model is already in the cache, attempt to pull the model with a TopicStream. If nothing comes back, the model hasn't been changed so use the cached version. If the model does come back, use the new model and update the cache.
          3) If the model is not already in the cache, pull the model using a CloudSolrStream and update the cache.

          The topic checkpoints can be kept in the same solr cloud collection as the models. So the syntax would be:

          model(collection, modelID)

          Show
          joel.bernstein Joel Bernstein added a comment - Aside from the inner class issue the ClassifyStream is looking very good. One thing we should talk about is the model streaming. I think it makes sense to pull the model in the Classify.open() method. I think it also makes sense to have a specific ModelStream implementation that has this behavior: 1) Checks a local cache of the models to see if the model is in memory. The cache can be a simple LRUCache. 2) If the model is already in the cache, attempt to pull the model with a TopicStream. If nothing comes back, the model hasn't been changed so use the cached version. If the model does come back, use the new model and update the cache. 3) If the model is not already in the cache, pull the model using a CloudSolrStream and update the cache. The topic checkpoints can be kept in the same solr cloud collection as the models. So the syntax would be: model(collection, modelID)
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Ok, first question.

          Why is the ClassifyStream an inner class of the StreamHandler?

          Show
          joel.bernstein Joel Bernstein added a comment - Ok, first question. Why is the ClassifyStream an inner class of the StreamHandler?
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Thanks for working on this! I'll start reviewing this patch today.

          Let's create a new ticket for the classify expression and link it to this ticket. There will be a number of approaches for deploying models, which we can build in separate tickets and link to this one.

          Show
          joel.bernstein Joel Bernstein added a comment - Thanks for working on this! I'll start reviewing this patch today. Let's create a new ticket for the classify expression and link it to this ticket. There will be a number of approaches for deploying models, which we can build in separate tickets and link to this one.
          Hide
          caomanhdat Cao Manh Dat added a comment -

          Re-upload the patch

          Show
          caomanhdat Cao Manh Dat added a comment - Re-upload the patch
          Hide
          caomanhdat Cao Manh Dat added a comment -

          A patch that add classify stream to Solr

          classify(
            topic(checkpointCollection, uknownCollection, q="*:*", fl="text_s, id", qt="/export", sort="id asc", id="1000000", initialCheckpoint="0"),
            topic(checkpointCollection, modelCollection, q="name_s:model", fl="iteration_i, name_s, terms_ss, idfs_ds, weights_ds, field_s", sort="iteration_i asc", include="true", id="2000000"),
            outcome="out_d",
            field="text_s",
            fieldType="whitetok"
          )
          

          Parameters of classify stream include :

          • first sub-stream fetch documents that needed to classify,
          • the second sub-stream fetch the lastest model
          • outcome specify the outcome field name that classify stream will decorate on documents
          • field : text field that classify stream will apply the model on
          • fieldType : Classify stream will use this fieldType to analyze text to correspond vector

          This patch also include a minor change to TopicStream (a new include parameter).

          solrParams.add("fq", "{!frange cost=100 incl="+Boolean.toString(includeCheckpoint)+" l="+checkpoint+"}_version_");
          

          It will help classify stream to re-fetch the last model (if collection doesn't have newer model)

          Show
          caomanhdat Cao Manh Dat added a comment - A patch that add classify stream to Solr classify( topic(checkpointCollection, uknownCollection, q= "*:*" , fl= "text_s, id" , qt= "/export" , sort= "id asc" , id= "1000000" , initialCheckpoint= "0" ), topic(checkpointCollection, modelCollection, q= "name_s:model" , fl= "iteration_i, name_s, terms_ss, idfs_ds, weights_ds, field_s" , sort= "iteration_i asc" , include= " true " , id= "2000000" ), outcome= "out_d" , field= "text_s" , fieldType= "whitetok" ) Parameters of classify stream include : first sub-stream fetch documents that needed to classify, the second sub-stream fetch the lastest model outcome specify the outcome field name that classify stream will decorate on documents field : text field that classify stream will apply the model on fieldType : Classify stream will use this fieldType to analyze text to correspond vector This patch also include a minor change to TopicStream (a new include parameter). solrParams.add( "fq" , "{!frange cost=100 incl=" + Boolean .toString(includeCheckpoint)+ " l=" +checkpoint+ "}_version_" ); It will help classify stream to re-fetch the last model (if collection doesn't have newer model)

            People

            • Assignee:
              joel.bernstein Joel Bernstein
              Reporter:
              joel.bernstein Joel Bernstein
            • Votes:
              2 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development