Uploaded image for project: 'Solr'
  1. Solr
  2. SOLR-7535

Add UpdateStream to Streaming API and Streaming Expression

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: clients - java, SolrJ
    • Labels:
      None

      Description

      The ticket adds an UpdateStream implementation to the Streaming API and streaming expressions. The UpdateStream will wrap a TupleStream and send the Tuples it reads to a SolrCloud collection to be indexed.

      This will allow users to pull data from different Solr Cloud collections, merge and transform the streams and send the transformed data to another Solr Cloud collection.

      1. SOLR-7535.patch
        36 kB
        Joel Bernstein
      2. SOLR-7535.patch
        30 kB
        Joel Bernstein
      3. SOLR-7535.patch
        28 kB
        Joel Bernstein
      4. SOLR-7535.patch
        21 kB
        Joel Bernstein
      5. SOLR-7535.patch
        21 kB
        Jason Gerlowski
      6. SOLR-7535.patch
        20 kB
        Jason Gerlowski
      7. SOLR-7535.patch
        11 kB
        Jason Gerlowski
      8. SOLR-7535.patch
        12 kB
        Jason Gerlowski

        Issue Links

          Activity

          Hide
          gerlowskija Jason Gerlowski added a comment -

          I'm in the process of hacking together a first pass at this.

          Going well for the most part, but I did run into one sticking point. UpdateStream.read() takes each tuple and sends it along to a SolrCloud collection. I was planning on converting the tuple into a SolrInputDocument, and then using CloudSolrClient.add(doc) to send along the converted tuple.

          It's not super hard to take a straw-man approach to the conversion:

              final SolrInputDocument doc = new SolrInputDocument();
              for (Object s : tupleFromSource.fields.keySet()) {
                doc.addField((String)s, tupleFromSource.get(s));
              }   
          

          Is this a reasonable approach? I think this'll work for simple cases, but I wasn't sure how it'd do with more complex tuples. Do tuples ever have non-String keys? Is there any special treatment that I should know about for nested-docs (I wasn't sure how these mapped to tuples).

          I'm assuming there must be some code out there that does the reverse-conversion (from Solr results to tuples). I nosed around a bit in StreamHandler.handleRequestBody and the various TupleStream implementations, but I didn't find anything too promising. Does anyone know where that might live. If I found that code it'd probably be helpful for doing the opposite conversion for UpdateStream

          Show
          gerlowskija Jason Gerlowski added a comment - I'm in the process of hacking together a first pass at this. Going well for the most part, but I did run into one sticking point. UpdateStream.read() takes each tuple and sends it along to a SolrCloud collection. I was planning on converting the tuple into a SolrInputDocument , and then using CloudSolrClient.add(doc) to send along the converted tuple. It's not super hard to take a straw-man approach to the conversion: final SolrInputDocument doc = new SolrInputDocument(); for ( Object s : tupleFromSource.fields.keySet()) { doc.addField(( String )s, tupleFromSource.get(s)); } Is this a reasonable approach? I think this'll work for simple cases, but I wasn't sure how it'd do with more complex tuples. Do tuples ever have non-String keys? Is there any special treatment that I should know about for nested-docs (I wasn't sure how these mapped to tuples). I'm assuming there must be some code out there that does the reverse-conversion ( from Solr results to tuples). I nosed around a bit in StreamHandler.handleRequestBody and the various TupleStream implementations, but I didn't find anything too promising. Does anyone know where that might live. If I found that code it'd probably be helpful for doing the opposite conversion for UpdateStream
          Hide
          dpgove Dennis Gove added a comment - - edited

          For the original mapping take a look at SolrStream, particularly the

          mapFields(...)

          function and where it is called from.

          It might make sense to require a SelectStream as the inner stream so that one can select the fields they want to insert. Or perhaps supporting a way to select fields as part of this stream's expression and it can internally use a SelectStream to implement that feature.

          Show
          dpgove Dennis Gove added a comment - - edited For the original mapping take a look at SolrStream, particularly the mapFields(...) function and where it is called from. It might make sense to require a SelectStream as the inner stream so that one can select the fields they want to insert. Or perhaps supporting a way to select fields as part of this stream's expression and it can internally use a SelectStream to implement that feature.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          I would start with the simplest case of key/value pairs. Assume String keys for the first round of work as well. So your approach looks fine.

          I would shoot for enough functionality to support a SQL SELECT INTO query, because the next step will be to wire the UpdateStream into the SQLHandler.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited I would start with the simplest case of key/value pairs. Assume String keys for the first round of work as well. So your approach looks fine. I would shoot for enough functionality to support a SQL SELECT INTO query, because the next step will be to wire the UpdateStream into the SQLHandler.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          As Dennis mentioned the SelectStream will handle the field mappings so no need to build that in.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited As Dennis mentioned the SelectStream will handle the field mappings so no need to build that in.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          After a bit of thought and a holiday break, I've got my first attempt at this ready for some feedback.

          Notes about this Patch

          1.) No tests yet. It does work (I tried it out manually), but it's getting close to the end of my night, and I wanted to get this out there on the off chance that someone has the time to take a look and give me some feedback before I sit back down to work on this again tomorrow evening. But I am planning on adding tests to StreamExpressionTest, and StreamExpressionToExpessionTest.
          2.) I didn't make any attempt to restrict the TupleStream implementations that UpdateStream can wrap. Mainly because I didn't get around to it yet. But also because, IMO, there are use cases where a user wouldn't need to use a SelectStream (for example, if they're doing field filtering in their initial Solr query/search() expression). Happy to change this in a subsequent patch. Just wanted to see what people thought.
          3.) I kept my original tuple-to-input-doc mapping in tact. It's limited, but as Joel mentioned, will probably do the job for a first pass.

          Questions about Surrounding Code

          These aren't necessarily related to this JIRA/patch, but working on this patch made me think of a few questions that I couldn't figure out answers to on my own.

          1.) Many of the TupleStream implementations require a collection to be explicitly stated as the first argument (i.e. search(gettingstarted...). However, the collection-name is already specified in the URL path (i.e. localhost:7574/solr/gettingstarted/stream?...). Are these values ever allowed to be different?
          2.) Many of the Stream Expressions are specified using a syntax that mixes named parameters (rows, sort, zkHost, etc.), and unnamed parameters ('collection' is probably the most common). Are there any guidelines/logic around which parameters are named, and which are unnamed? If I'm creating a new TupleStream type (as we are here), are there any guidelines on what the expression interface should look like?

          Thanks in advance if anyone can help clarify some of those things for me. Should be back online soon to revise this further.

          Show
          gerlowskija Jason Gerlowski added a comment - After a bit of thought and a holiday break, I've got my first attempt at this ready for some feedback. Notes about this Patch 1.) No tests yet. It does work (I tried it out manually), but it's getting close to the end of my night, and I wanted to get this out there on the off chance that someone has the time to take a look and give me some feedback before I sit back down to work on this again tomorrow evening. But I am planning on adding tests to StreamExpressionTest , and StreamExpressionToExpessionTest . 2.) I didn't make any attempt to restrict the TupleStream implementations that UpdateStream can wrap. Mainly because I didn't get around to it yet. But also because, IMO, there are use cases where a user wouldn't need to use a SelectStream (for example, if they're doing field filtering in their initial Solr query/search() expression). Happy to change this in a subsequent patch. Just wanted to see what people thought. 3.) I kept my original tuple-to-input-doc mapping in tact. It's limited, but as Joel mentioned, will probably do the job for a first pass. Questions about Surrounding Code These aren't necessarily related to this JIRA/patch, but working on this patch made me think of a few questions that I couldn't figure out answers to on my own. 1.) Many of the TupleStream implementations require a collection to be explicitly stated as the first argument (i.e. search(gettingstarted...) . However, the collection-name is already specified in the URL path (i.e. localhost:7574/solr/gettingstarted/stream?... ). Are these values ever allowed to be different? 2.) Many of the Stream Expressions are specified using a syntax that mixes named parameters (rows, sort, zkHost, etc.), and unnamed parameters ('collection' is probably the most common). Are there any guidelines/logic around which parameters are named, and which are unnamed? If I'm creating a new TupleStream type (as we are here), are there any guidelines on what the expression interface should look like? Thanks in advance if anyone can help clarify some of those things for me. Should be back online soon to revise this further.
          Hide
          dpgove Dennis Gove added a comment -

          I haven't looked at the patch yet but to answer your questions,

          1. The name of the collection in the URL path and collection in any part of the expression can absolutely be different. There are couple of cases where this difference will most likely appear. First, during a join or merge of multiple of collections only one of the collection names can be contained in the URL. For example

          innerJoin(
            search(people, fl="personId,name", q="*:*", sort="personId asc"),
            search(address, fl="personId,city", q="state:ny", sort="personId asc"),
            on="personId"
          )
          

          Two collections are being hit but only a single one can be included in the URL. There aren't any hard and fast rules about which one should be used in the URL and that decision could depend on a lot of different things, especially if the collections live in different clouds or on different hardware.

          There is also the possibility that the http request is being sent to what is effectively an empty collection which only exists to perform parallel work using the streaming api. For example, imagine you want to do some heavy metric processing but you don't want to use more resources than necessary on the servers where the collections live. You could setup an empty collection on totally different hardware with the intent of that hardware to act solely as workers on the real collection. This would allow you to do the heavy lifting on separate hardware from where the collection actually lives.

          For these reasons the collection name is a required parameter in the base streams (SolrCloudStream and FacetStream).

          2. There are three types of parameters; positional, unnamed, and named.
          Positional parameters are those which must exist in some specific location in the expression. IIRC, the only positional parameters are the collection names in the base streams. This is done because the collection name is critical and as such we can say it is the first parameter, regardless of anything else included.

          Unnamed parameters are those whose meaning can be determined by the content of the parameter. For example,

          rollup(
            search(people, fl="personId,name,age", q="*:*", sort="personId asc"),
            max(age),
            min(age),
            avg(age)
          )
          

          in this example we know that search(...) is a stream and max(...), min(...), and avg(...) are metrics. Unnamed parameters are also very useful in situations where the number of parameters of that type are non-determistic. In the example above one could provide any number of metrics and by keeping them unnamed the user can just keep adding new metrics without worrying about names. Another example of this is with the MergeStream where one can merge 2 or more streams together.

          Named parameters are used when you want to be very clear about what a particular parameter is being used for. For example, the "on" parameter in a join clause is to indicate that the join should be done on some field (or fields). The HashJoinStream is an interesting one because we have a named parameter "hashed" whose parameter needs to be a stream. In this case the decision to use a named parameter was made so as to be very clear to the user which stream is being hashed and which one is not. Generally it comes down to whether a parameter name would make things clearer for the user.

          Show
          dpgove Dennis Gove added a comment - I haven't looked at the patch yet but to answer your questions, 1. The name of the collection in the URL path and collection in any part of the expression can absolutely be different. There are couple of cases where this difference will most likely appear. First, during a join or merge of multiple of collections only one of the collection names can be contained in the URL. For example innerJoin( search(people, fl= "personId,name" , q= "*:*" , sort= "personId asc" ), search(address, fl= "personId,city" , q= "state:ny" , sort= "personId asc" ), on= "personId" ) Two collections are being hit but only a single one can be included in the URL. There aren't any hard and fast rules about which one should be used in the URL and that decision could depend on a lot of different things, especially if the collections live in different clouds or on different hardware. There is also the possibility that the http request is being sent to what is effectively an empty collection which only exists to perform parallel work using the streaming api. For example, imagine you want to do some heavy metric processing but you don't want to use more resources than necessary on the servers where the collections live. You could setup an empty collection on totally different hardware with the intent of that hardware to act solely as workers on the real collection. This would allow you to do the heavy lifting on separate hardware from where the collection actually lives. For these reasons the collection name is a required parameter in the base streams (SolrCloudStream and FacetStream). 2. There are three types of parameters; positional, unnamed, and named. Positional parameters are those which must exist in some specific location in the expression. IIRC, the only positional parameters are the collection names in the base streams. This is done because the collection name is critical and as such we can say it is the first parameter, regardless of anything else included. Unnamed parameters are those whose meaning can be determined by the content of the parameter. For example, rollup( search(people, fl= "personId,name,age" , q= "*:*" , sort= "personId asc" ), max(age), min(age), avg(age) ) in this example we know that search(...) is a stream and max(...), min(...), and avg(...) are metrics. Unnamed parameters are also very useful in situations where the number of parameters of that type are non-determistic. In the example above one could provide any number of metrics and by keeping them unnamed the user can just keep adding new metrics without worrying about names. Another example of this is with the MergeStream where one can merge 2 or more streams together. Named parameters are used when you want to be very clear about what a particular parameter is being used for. For example, the "on" parameter in a join clause is to indicate that the join should be done on some field (or fields). The HashJoinStream is an interesting one because we have a named parameter "hashed" whose parameter needs to be a stream. In this case the decision to use a named parameter was made so as to be very clear to the user which stream is being hashed and which one is not. Generally it comes down to whether a parameter name would make things clearer for the user.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Jason Gerlowski, the patch looks good.

          Three comments

          1) I'd like to limit the changes in the patch to the UpdateStream if possible. It looks like the UpdateStream is extending CloudSolrStream which pushed some changes into CloudSolrStream. Let's have the UpdateStream extend TupleStream for now. In another ticket we can look at moving some shared methods to the TupleStream class to eliminate code duplication.

          2) Let's remove the commit following the EOF tuple. The UpdateStream is likely to be run in parallel which means dozens of workers will be committing at the same time. We can add a CommitStream which would not be run in paralllel that will commit after a number updates or after it sees the EOF tuple.

          We'll implement the CommitStream in a different ticket. For now we can rely on autoCommits to commit and explicitly commit in the test cases.

          The pseudo code below shows a CommitStream wrapping an UpdateStream which is wrapped by a ParallelStream.

          commit(
                       collection1, 
                       parallel(
                                    update(
                                                collection1,
                                                search(collection2...))
                        ), 
                        100000)
          

          3) We'll want to implement batching. So we'll need to add a batch size parameter to the UpdateStream. Then we'll send the updates in a batch to the CloudSolrClient. After each batch the read() method should return a Tuple with the number of documents indexed in the batch. This Tuple can be used by the CommitStream to commit every X records and can be returned to the client which will ensure that we don't get client timeouts do to inactivity.

          So each call to the UpdateStream.read() will read a batch of docs from the sourceStream, index the batch and return a Tuple with the count.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Jason Gerlowski , the patch looks good. Three comments 1) I'd like to limit the changes in the patch to the UpdateStream if possible. It looks like the UpdateStream is extending CloudSolrStream which pushed some changes into CloudSolrStream. Let's have the UpdateStream extend TupleStream for now. In another ticket we can look at moving some shared methods to the TupleStream class to eliminate code duplication. 2) Let's remove the commit following the EOF tuple. The UpdateStream is likely to be run in parallel which means dozens of workers will be committing at the same time. We can add a CommitStream which would not be run in paralllel that will commit after a number updates or after it sees the EOF tuple. We'll implement the CommitStream in a different ticket. For now we can rely on autoCommits to commit and explicitly commit in the test cases. The pseudo code below shows a CommitStream wrapping an UpdateStream which is wrapped by a ParallelStream. commit( collection1, parallel( update( collection1, search(collection2...)) ), 100000) 3) We'll want to implement batching. So we'll need to add a batch size parameter to the UpdateStream. Then we'll send the updates in a batch to the CloudSolrClient. After each batch the read() method should return a Tuple with the number of documents indexed in the batch. This Tuple can be used by the CommitStream to commit every X records and can be returned to the client which will ensure that we don't get client timeouts do to inactivity. So each call to the UpdateStream.read() will read a batch of docs from the sourceStream, index the batch and return a Tuple with the count.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          Thanks for taking the time to help me out Dennis, that makes a lot of sense and really helps.

          Show
          gerlowskija Jason Gerlowski added a comment - Thanks for taking the time to help me out Dennis, that makes a lot of sense and really helps.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          Thanks for the feedback Joel. (1) and (2) I get. (3)'s a little less clear to me.

          Are you saying that a single read() on an UpdateStream will call read() X times (i.e. batchSize times) on the wrapped stream, package and send those docs to a collection, and then return a single tuple that says how many tuples were read?

          Is it an issue at all that UpdateStream would be swallowing the individual tuples? This would prevent users from doing things (other than committing) with the output of UpdateStream. For example, the use-case below seems valid to me, but wouldn't be supported with the proposed behavior:

              update(collection5,
                     merge(
                         update(collection3, search(collection1, ...)),
                         update(collection4, search(collection2, ...))
                     )
              )
          

          Maybe there's not a real need to support that. And Streaming API users would still be able to do this, they'd just need to do it in 2 steps/requests instead of 1. I don't have a preference either way; just wanted to bring it up.

          Show
          gerlowskija Jason Gerlowski added a comment - Thanks for the feedback Joel. (1) and (2) I get. (3)'s a little less clear to me. Are you saying that a single read() on an UpdateStream will call read() X times (i.e. batchSize times) on the wrapped stream, package and send those docs to a collection, and then return a single tuple that says how many tuples were read? Is it an issue at all that UpdateStream would be swallowing the individual tuples? This would prevent users from doing things (other than committing) with the output of UpdateStream. For example, the use-case below seems valid to me, but wouldn't be supported with the proposed behavior: update(collection5, merge( update(collection3, search(collection1, ...)), update(collection4, search(collection2, ...)) ) ) Maybe there's not a real need to support that. And Streaming API users would still be able to do this, they'd just need to do it in 2 steps/requests instead of 1. I don't have a preference either way; just wanted to bring it up.
          Hide
          dpgove Dennis Gove added a comment -

          In the Streaming API, read() is called until an EOF tuple is seen. This means that, even with an UpdateStream, one would have this code

          while(true){
            tuple = updateStream.read()
          
            // if # of records is some size, do a commit
          
            if(tuple.EOF){
              break
            }
          }
          

          I think it's the correct thing for an UpdateStream to swallow the individual tuples. The use-case you described isn't one I see existing. But if it did then I could see it being dealt with using a TeeStream. A TeeStream would work exactly like the unix command tee and take a single input stream and tee it out into multiple output streams. In this use-case, one would Tee the underlying searches. But again, I don't see this need actually existing.

          Show
          dpgove Dennis Gove added a comment - In the Streaming API, read() is called until an EOF tuple is seen. This means that, even with an UpdateStream, one would have this code while ( true ){ tuple = updateStream.read() // if # of records is some size, do a commit if (tuple.EOF){ break } } I think it's the correct thing for an UpdateStream to swallow the individual tuples. The use-case you described isn't one I see existing. But if it did then I could see it being dealt with using a TeeStream. A TeeStream would work exactly like the unix command tee and take a single input stream and tee it out into multiple output streams. In this use-case, one would Tee the underlying searches. But again, I don't see this need actually existing.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Jason Gerlowski, you've described what I was thinking correctly.

          I think swallowing the Tuples is the correct behavior. Imagine 15 workers pulling Tuples from 20+ shards. The throughput would bottleneck if we funneled all those tuples back to one client.

          Think of the returned tuple as a type of useful aggregation like the RollupStream, which swallows Tuples on the worker nodes and returns aggregates to one client.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Jason Gerlowski , you've described what I was thinking correctly. I think swallowing the Tuples is the correct behavior. Imagine 15 workers pulling Tuples from 20+ shards. The throughput would bottleneck if we funneled all those tuples back to one client. Think of the returned tuple as a type of useful aggregation like the RollupStream, which swallows Tuples on the worker nodes and returns aggregates to one client.
          Hide
          dpgove Dennis Gove added a comment -

          I had an interesting thought related to the call to read().

          Should there be some distinction between a ReadStream and a WriteStream. A ReadStream is one which reads tuples out while a WriteStream is one which writes tuples in. Up until this point we've only ever had ReadStreams and the read() method has always made sense. But the UpdateStream is a WriteStream and maybe it should have a different function, maybe write(). Also, it might be nice to be able to say in a stream that it's direct incoming stream must be a WriteStream (for example, a CommitStream would only work on a WriteStream while a RollupStream would only work on a ReadStream). (though maybe it'd be interesting to do rollups over the output tuples of an UpdateStream.....).

          Thoughts?

          Show
          dpgove Dennis Gove added a comment - I had an interesting thought related to the call to read(). Should there be some distinction between a ReadStream and a WriteStream. A ReadStream is one which reads tuples out while a WriteStream is one which writes tuples in. Up until this point we've only ever had ReadStreams and the read() method has always made sense. But the UpdateStream is a WriteStream and maybe it should have a different function, maybe write(). Also, it might be nice to be able to say in a stream that it's direct incoming stream must be a WriteStream (for example, a CommitStream would only work on a WriteStream while a RollupStream would only work on a ReadStream). (though maybe it'd be interesting to do rollups over the output tuples of an UpdateStream.....). Thoughts?
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          The problem is the ParallelStream takes a TupleStream. Possibly we'd need a ParallelWrite and ParallelRead stream. Let's not introduce that change into this ticket because I think it requires some more thought.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited The problem is the ParallelStream takes a TupleStream. Possibly we'd need a ParallelWrite and ParallelRead stream. Let's not introduce that change into this ticket because I think it requires some more thought.
          Hide
          dpgove Dennis Gove added a comment -

          I agree. It needs to be fleshed out some more.

          Show
          dpgove Dennis Gove added a comment - I agree. It needs to be fleshed out some more.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          Latest patch addresses 3 of Joel's concerns (see above):

          1.) UpdateStream now extends TupleStream, instead of CloudSolrStream
          2.) UpdateStream no longer commits on EOF.
          3.) UpdateStream now takes a mandatory batchSize argument as a namedParameter. It reads batchSize tuples from the wrapped stream before sending them off. It then spits out a tuple with a "uploadedDocs" parameter.

          i.e. the stream now outputs data that looks like:

          {"result-set":
              {"docs":[
                  {"uploadedDocs":5},
                  {"uploadedDocs":5},
                  {"uploadedDocs":5},
                  ....
                  {"uploadedDocs":4,"EOF":true,"RESPONSE_TIME":146}]
              }
          }
          

          I thought a bit about making batchSize an optional parameter, and just using a reasonable default/fallback value when no value is provided. But I decided against it, since this is probably something a user should be deciding for themselves.

          Still no tests on this patch. Running late for work, so I can't add them now. Hopefully that'll be a job for this evening.

          Show
          gerlowskija Jason Gerlowski added a comment - Latest patch addresses 3 of Joel's concerns (see above): 1.) UpdateStream now extends TupleStream, instead of CloudSolrStream 2.) UpdateStream no longer commits on EOF. 3.) UpdateStream now takes a mandatory batchSize argument as a namedParameter. It reads batchSize tuples from the wrapped stream before sending them off. It then spits out a tuple with a "uploadedDocs" parameter. i.e. the stream now outputs data that looks like: { "result-set" : { "docs" :[ { "uploadedDocs" :5}, { "uploadedDocs" :5}, { "uploadedDocs" :5}, .... { "uploadedDocs" :4, "EOF" : true , "RESPONSE_TIME" :146}] } } I thought a bit about making batchSize an optional parameter, and just using a reasonable default/fallback value when no value is provided. But I decided against it, since this is probably something a user should be deciding for themselves. Still no tests on this patch. Running late for work, so I can't add them now. Hopefully that'll be a job for this evening.
          Hide
          caomanhdat Cao Manh Dat added a comment -

          Great feature. I just wonder about fault tolerance, can we resume the UpdateStream if something happen?

          Show
          caomanhdat Cao Manh Dat added a comment - Great feature. I just wonder about fault tolerance, can we resume the UpdateStream if something happen?
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Ah, just realized we forgot about multi-valued fields. I think we just need to check for Tuple values that are Lists and add them in a loop.

          Show
          joel.bernstein Joel Bernstein added a comment - Ah, just realized we forgot about multi-valued fields. I think we just need to check for Tuple values that are Lists and add them in a loop.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Cao Manh Dat, Adding fault tolerance is going to be important. But I think it will require quite a bit of thought to get right. Possibly we should have a separate ticket specifically for adding fault tolerance to the Streaming API. In that ticket we can discuss the different possible failures that can occur and how those failures can be handled.

          Show
          joel.bernstein Joel Bernstein added a comment - Cao Manh Dat , Adding fault tolerance is going to be important. But I think it will require quite a bit of thought to get right. Possibly we should have a separate ticket specifically for adding fault tolerance to the Streaming API. In that ticket we can discuss the different possible failures that can occur and how those failures can be handled.
          Hide
          gerlowskija Jason Gerlowski added a comment - - edited

          +1 for adding fault tolerance, but for doing so under a separate JIRA ticket. This is something that probably needs thought about across the board.

          Additionally, just wanted to put some updates on my progress here. I sat down this morning to work on tests for UpdateStream. The simple cases all seem to work fine.

          I did run into two issues though when I tried to write a test that combined ParallelStream and UpdateStream (i.e. parallel(a, update(b... ):

          1.) Currently, if UpdateStream reaches EOF mid-batch, it sends out an EOF tuple that also contains a "docsUploaded" field. But ParallelStream currently swallows this tuple and spits out a completely clean EOF tuple. (It seems a few Stream types expect that EOF tuples don't have any substantive fields).

          This shouldn't be hard to fix. I can just change UpdateStream to emit EOF after emitting a tuple with the partial batch. i.e. instead of {{

          {EOF:true, docsUploaded:3}

          }}, just return {{

          {docsUploaded:3}

          }} followed-by {{ {EOF:true}} }

          2.) ParallelStream works by providing partitionKeys to the underlying searches. However, this doesn't work with UpdateStream, which goes to the /update handler, not the /search handler. Since there's no partitioning, the same update gets run twice, putting two copies of the docs in the collection used by update().

          I didn't really anticipate running into any major problems in using ParallelStream with UpdateStream, but it looks to me like ParallelStream is only really appropriate for wrapping searches, not updates. (This reminds me a bit of Dennis' comments above about ReadStreams and WriteStreams). Am I interpreting this incorrectly?

          Running out of the house now, but I'll be back shortly to look at this again. Sorry if my notes above are a bit rough. I'm jotting them down half so I remember where I was, and I haven't really thought through things as well as I would've liked yet.

          Show
          gerlowskija Jason Gerlowski added a comment - - edited +1 for adding fault tolerance, but for doing so under a separate JIRA ticket. This is something that probably needs thought about across the board. Additionally, just wanted to put some updates on my progress here. I sat down this morning to work on tests for UpdateStream. The simple cases all seem to work fine. I did run into two issues though when I tried to write a test that combined ParallelStream and UpdateStream (i.e. parallel(a, update(b... ): 1.) Currently, if UpdateStream reaches EOF mid-batch, it sends out an EOF tuple that also contains a "docsUploaded" field. But ParallelStream currently swallows this tuple and spits out a completely clean EOF tuple. (It seems a few Stream types expect that EOF tuples don't have any substantive fields). This shouldn't be hard to fix. I can just change UpdateStream to emit EOF after emitting a tuple with the partial batch. i.e. instead of {{ {EOF:true, docsUploaded:3} }}, just return {{ {docsUploaded:3} }} followed-by {{ {EOF:true}} } 2.) ParallelStream works by providing partitionKeys to the underlying searches. However, this doesn't work with UpdateStream, which goes to the /update handler, not the /search handler. Since there's no partitioning, the same update gets run twice, putting two copies of the docs in the collection used by update(). I didn't really anticipate running into any major problems in using ParallelStream with UpdateStream, but it looks to me like ParallelStream is only really appropriate for wrapping searches, not updates. (This reminds me a bit of Dennis' comments above about ReadStreams and WriteStreams). Am I interpreting this incorrectly? Running out of the house now, but I'll be back shortly to look at this again. Sorry if my notes above are a bit rough. I'm jotting them down half so I remember where I was, and I haven't really thought through things as well as I would've liked yet.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          for #1, you've got the right plan.

          for #2:

          The partitionKeys will get added to the search(...). This will partition the Tuples evenly across the workers. Each worker will then update in parallel. This syntax should work:

          parallel(update(search(..., partitionKeys="x")))
          

          Because each worker gets a partition of the search(...), there should be no duplication of indexing.

          The /update handler simply receives the updates and is unaware of the a parallelization. The search(...) will continue to use the /export handler.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited for #1, you've got the right plan. for #2: The partitionKeys will get added to the search(...). This will partition the Tuples evenly across the workers. Each worker will then update in parallel. This syntax should work: parallel(update(search(..., partitionKeys= "x" ))) Because each worker gets a partition of the search(...), there should be no duplication of indexing. The /update handler simply receives the updates and is unaware of the a parallelization. The search(...) will continue to use the /export handler.
          Hide
          dpgove Dennis Gove added a comment -

          +1 on fault tolerance as well.

          1) I think the expected behavior of all streams is that the EOF tuple could contain extra metadata about the stream that is only known at the end. This allows an clients (or other streams) to know that this metadata didn't come from a real document but is just EOF metadata. If there are streams which don't handle a non-empty EOF tuple I think those streams should be corrected.

          2) I think you're correct about the ParallelStream and how it operates. I don't see a way for the ParallelStream, as currently implemented, to interact with the raw tuples coming out from a call to another streams read() method. Ie, it does depend on doing the partitioning at the source and cannot do it in the middle of a data pipeline. It'd be a nice feature to be able to take a single stream of data and split it out onto N streams across N workers.

          Here's an example of a pipeline I'd like to be able to create with a ParallelStream but currently cannot seem to. Essentially, do something with the data then split it off to workers to to perform the expensive operations and then bring them back together (I hope the ascii art shows properly).

                                            / --- worker1 --- rollup --- sort ---\
          sourceA ---\                     /----- worker2 --- rollup --- sort ----\  
                      ----------- join ---<------ worker3 --- rollup --- sort -----> --- mergesort ---\
          sourceB ---/                     \----- worker4 --- rollup --- sort ----/                    >--- join ---- output
                                            \ --- worker5 --- rollup --- sort ---/         sourceC ---/
          

          My understanding is that the parallelization must be done at the start of the pipeline and cannot be done in the middle of the pipeline.

          Maybe a new stream is required that can split streams off to workers.

          Show
          dpgove Dennis Gove added a comment - +1 on fault tolerance as well. 1) I think the expected behavior of all streams is that the EOF tuple could contain extra metadata about the stream that is only known at the end. This allows an clients (or other streams) to know that this metadata didn't come from a real document but is just EOF metadata. If there are streams which don't handle a non-empty EOF tuple I think those streams should be corrected. 2) I think you're correct about the ParallelStream and how it operates. I don't see a way for the ParallelStream, as currently implemented, to interact with the raw tuples coming out from a call to another streams read() method. Ie, it does depend on doing the partitioning at the source and cannot do it in the middle of a data pipeline. It'd be a nice feature to be able to take a single stream of data and split it out onto N streams across N workers. Here's an example of a pipeline I'd like to be able to create with a ParallelStream but currently cannot seem to. Essentially, do something with the data then split it off to workers to to perform the expensive operations and then bring them back together (I hope the ascii art shows properly). / --- worker1 --- rollup --- sort ---\ sourceA ---\ /----- worker2 --- rollup --- sort ----\ ----------- join ---<------ worker3 --- rollup --- sort -----> --- mergesort ---\ sourceB ---/ \----- worker4 --- rollup --- sort ----/ >--- join ---- output \ --- worker5 --- rollup --- sort ---/ sourceC ---/ My understanding is that the parallelization must be done at the start of the pipeline and cannot be done in the middle of the pipeline. Maybe a new stream is required that can split streams off to workers.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Yes, currently partitioning is only done as part of the search(). So any workflow that requires re-partitioning will have to be done in multiple steps. That's why this ticket is so important, the UpdateStream allows for write-backs.

          In the example above, the first join would need to be wrapped in an UpdateStream and sent to a temp index. The temp index would be used for the next steps.

          In the future we can look at faster ways to re-partition. One example would be to have the workers repartition to local disk. Then the second step could read from the worker nodes rather the searching. This still involves multiple steps but it would be much faster.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Yes, currently partitioning is only done as part of the search(). So any workflow that requires re-partitioning will have to be done in multiple steps. That's why this ticket is so important, the UpdateStream allows for write-backs. In the example above, the first join would need to be wrapped in an UpdateStream and sent to a temp index. The temp index would be used for the next steps. In the future we can look at faster ways to re-partition. One example would be to have the workers repartition to local disk. Then the second step could read from the worker nodes rather the searching. This still involves multiple steps but it would be much faster.
          Hide
          dpgove Dennis Gove added a comment -

          Clever. I like it.

          Show
          dpgove Dennis Gove added a comment - Clever. I like it.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          A few possible streams:

          PartitionStream: writes to local disk partitioning on keys. Used when the next stage does not require a re-sort.
          ShuffleStream: writes to local disk sorting and partitioning by keys. Used when the next stage requires a re-sort.
          HttpStream: Gets passed a list of URL'S to read a stream from. This would read directly from worker nodes. We could simply point directly to the files and let jetty stream the data back directly. As a bonus this stream could also be a generic way to hook in any Http service.

          Step1:
          
          parallel(partition(innerJoin(search(...), search(...))
          
          Step 2:
          
          parallel(hashJoin(http(...), search(..))
          

          The PartitionStream would return a Tuple when it's finished writing the partitions that includes its node address. There would need to be a little glue code that would gather the node addresses from step 1 and kick off step 2. This could be written in any language. The SQLHandler will of course the steps behind the scene.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited A few possible streams: PartitionStream : writes to local disk partitioning on keys. Used when the next stage does not require a re-sort. ShuffleStream : writes to local disk sorting and partitioning by keys. Used when the next stage requires a re-sort. HttpStream : Gets passed a list of URL'S to read a stream from. This would read directly from worker nodes. We could simply point directly to the files and let jetty stream the data back directly. As a bonus this stream could also be a generic way to hook in any Http service. Step1: parallel(partition(innerJoin(search(...), search(...)) Step 2: parallel(hashJoin(http(...), search(..)) The PartitionStream would return a Tuple when it's finished writing the partitions that includes its node address. There would need to be a little glue code that would gather the node addresses from step 1 and kick off step 2. This could be written in any language. The SQLHandler will of course the steps behind the scene.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          We could also add a GlueStream to tie together the steps.

          
          glue(
          parallel(shuffle(innerJoin(search(...), search(...))),
          parallel(partition(innerJoin(http(...), search(..))),
          parallel(hashJoin(http(...), search(..))
          )
          
          
          
          Show
          joel.bernstein Joel Bernstein added a comment - We could also add a GlueStream to tie together the steps. glue( parallel(shuffle(innerJoin(search(...), search(...))), parallel(partition(innerJoin(http(...), search(..))), parallel(hashJoin(http(...), search(..)) )
          Hide
          gerlowskija Jason Gerlowski added a comment -

          The partitionKeys will get added to the search(...)

          Ah, I see my mistake here. Reading the Streaming Expression wiki page (https://cwiki.apache.org/confluence/display/solr/Streaming+Expressions), I read

          The parallel function requires that the partitionKeys parameter be provided to the underlying searches.

          and interpreted it to mean that if I provided the parameter to parallel(), it would be passed through. But on a second glance, the example clearly shows that the caller needs to put it on the underlying search() themselves.

          I was seeing duplicate documents indexed because I wasn't providing a partition on the searches. So that's clearly my fault.

          That said, it'd be nice if there was a way to detect this misconfiguration from within ParallelStream. It'd be easy to do some sort of dumb check, such as ensuring the underlying expression contains the string 'partitionKeys'. There's a lot of obvious issues with that, but it might be better than nothing, and would let us spit out a helpful error message or warning. Or maybe this isn't really important enough to worry about at this point.

          Show
          gerlowskija Jason Gerlowski added a comment - The partitionKeys will get added to the search(...) Ah, I see my mistake here. Reading the Streaming Expression wiki page ( https://cwiki.apache.org/confluence/display/solr/Streaming+Expressions ), I read The parallel function requires that the partitionKeys parameter be provided to the underlying searches. and interpreted it to mean that if I provided the parameter to parallel() , it would be passed through. But on a second glance, the example clearly shows that the caller needs to put it on the underlying search() themselves. I was seeing duplicate documents indexed because I wasn't providing a partition on the searches. So that's clearly my fault. That said, it'd be nice if there was a way to detect this misconfiguration from within ParallelStream . It'd be easy to do some sort of dumb check, such as ensuring the underlying expression contains the string 'partitionKeys'. There's a lot of obvious issues with that, but it might be better than nothing, and would let us spit out a helpful error message or warning. Or maybe this isn't really important enough to worry about at this point.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          An exception will be thrown if there is more then one worker specified and partitionKeys is not set. If that's not working then it's a regression.

          Show
          joel.bernstein Joel Bernstein added a comment - An exception will be thrown if there is more then one worker specified and partitionKeys is not set. If that's not working then it's a regression.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          I think it would be an improvement to set the partitionKeys on the parallel() function and if needed override the partitionKeys on the search() function. Each search() might partition on different fields so the override needs to remain.

          Show
          joel.bernstein Joel Bernstein added a comment - I think it would be an improvement to set the partitionKeys on the parallel() function and if needed override the partitionKeys on the search() function. Each search() might partition on different fields so the override needs to remain.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          After some more poking around last night and this morning (and with help from Joel and Dennis), I found where my confusion was coming from yesterday.

          I've updated the patch to include basic tests. StreamExpressionTest now has clauses for testUpdateStream, and testParallelUpdateStream. I also added a testUpdateStream to StreamExpressionToExpessionTest The tests (hopefully) do about what you expect them to.

          I want to stress that these are just basic tests though. There were a few other test cases that I thought of adding but didn't. (test where batchSize evenly divides into numResults for underlying stream, test where batchSize doesn't evenly divide, test where there are 0 results from underlying stream, test that nice messages are returned on common error cases, test that multivalued fields are handled properly, etc.)

          I'm happy to add these sorts of tests too if people think they're worth the future-maintenance and test-suite-runtime cost. (I think they're def worth it, but I wanted to defer to others with more experience before starting...just a sanity check). Ideally, since there's more cases I'm trying to cover, I'd like to put these tests in a separate file entirely (i.e. a new UpdateStreamTest).

          Show
          gerlowskija Jason Gerlowski added a comment - After some more poking around last night and this morning (and with help from Joel and Dennis), I found where my confusion was coming from yesterday. I've updated the patch to include basic tests. StreamExpressionTest now has clauses for testUpdateStream, and testParallelUpdateStream. I also added a testUpdateStream to StreamExpressionToExpessionTest The tests (hopefully) do about what you expect them to. I want to stress that these are just basic tests though. There were a few other test cases that I thought of adding but didn't. (test where batchSize evenly divides into numResults for underlying stream, test where batchSize doesn't evenly divide, test where there are 0 results from underlying stream, test that nice messages are returned on common error cases, test that multivalued fields are handled properly, etc.) I'm happy to add these sorts of tests too if people think they're worth the future-maintenance and test-suite-runtime cost. (I think they're def worth it, but I wanted to defer to others with more experience before starting...just a sanity check). Ideally, since there's more cases I'm trying to cover, I'd like to put these tests in a separate file entirely (i.e. a new UpdateStreamTest ).
          Hide
          joel.bernstein Joel Bernstein added a comment -

          It looks like we still need to handle the multi-value field case.

          Your ideas for tests sound good. I would also add tests for different field types, both single and multi-value.

          I plan on pulling down your latest patch tomorrow and I'll add tests as well. I'll probably add tests to StreamingTests as well.

          I'll also manually test at scale before committing.

          Show
          joel.bernstein Joel Bernstein added a comment - It looks like we still need to handle the multi-value field case. Your ideas for tests sound good. I would also add tests for different field types, both single and multi-value. I plan on pulling down your latest patch tomorrow and I'll add tests as well. I'll probably add tests to StreamingTests as well. I'll also manually test at scale before committing.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          I forgot to make the change Joel suggested for supporting multivalued fields. This patch is a small update to take care of that.

          Show
          gerlowskija Jason Gerlowski added a comment - I forgot to make the change Joel suggested for supporting multivalued fields. This patch is a small update to take care of that.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch that wraps the stream source in a PushbackStream. This allows us to push back the EOF tuple and upload the batch. This is a nice approach that preserves the EOF tuple from the source stream in case there is info in the EOF tuple.

          Existing tests are passing with this patch.

          I'll spend some time today expanding the tests.

          Show
          joel.bernstein Joel Bernstein added a comment - New patch that wraps the stream source in a PushbackStream. This allows us to push back the EOF tuple and upload the batch. This is a nice approach that preserves the EOF tuple from the source stream in case there is info in the EOF tuple. Existing tests are passing with this patch. I'll spend some time today expanding the tests.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Added multi-value fields to testUpdateStream() and am also now checking the values of the Tuples from the destination collection.

          I'll do the same for the testParallelUpdateStream and then move on to manual testing.

          Show
          joel.bernstein Joel Bernstein added a comment - Added multi-value fields to testUpdateStream() and am also now checking the values of the Tuples from the destination collection. I'll do the same for the testParallelUpdateStream and then move on to manual testing.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          The main thing I'll be looking for in the manual testing is to see what happens when the exports rate is much faster then the indexing rate. In this scenario the /export handler may be blocked for long periods of time, possibly leading to timeouts from Jetty. If this happens we may have to change this implementation to write each batch to local disk and then index the batches in a background thread.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited The main thing I'll be looking for in the manual testing is to see what happens when the exports rate is much faster then the indexing rate. In this scenario the /export handler may be blocked for long periods of time, possibly leading to timeouts from Jetty. If this happens we may have to change this implementation to write each batch to local disk and then index the batches in a background thread.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          After some more thought, I'm thinking of adding a buffer="true/false" parameter to the UpdateStream. If buffer="true" then the UpdateStream will first write each batch to local disk. During the buffering phase each tuple with return the "buffered" count. When all the records have been buffered, each call to read() will index one batch from disk and return the "indexed" count.

          I believe we're going to need this buffering approach when indexing large amounts of data from a large number of shards. For example with 10 workers and 20 shards with 3 replicas we could expect well over 10 million records per second being exported from the shards. Indexing will be much, much slower so the exporting shards will be blocked for minutes at time causing timeouts. Buffering to local disk should be able to keep up, even with compression.

          If buffer="false" then the UpdateStream will directly update the way that it does now. This will work fine for smaller loads.

          Show
          joel.bernstein Joel Bernstein added a comment - After some more thought, I'm thinking of adding a buffer="true/false" parameter to the UpdateStream. If buffer="true" then the UpdateStream will first write each batch to local disk. During the buffering phase each tuple with return the "buffered" count. When all the records have been buffered, each call to read() will index one batch from disk and return the "indexed" count. I believe we're going to need this buffering approach when indexing large amounts of data from a large number of shards. For example with 10 workers and 20 shards with 3 replicas we could expect well over 10 million records per second being exported from the shards. Indexing will be much, much slower so the exporting shards will be blocked for minutes at time causing timeouts. Buffering to local disk should be able to keep up, even with compression. If buffer="false" then the UpdateStream will directly update the way that it does now. This will work fine for smaller loads.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          Haha, I take it that the manual testing didn't go well then.

          I like the local-buffering idea. Out of curiosity, do you think there'd be any value in making the different behaviors entirely different TupleStream implementations (i.e. update() and buffered-update()). It might make the calling syntax a little easier (one less parameter). And it might keep the code cleaner, if we can avoid a bunch of if (buffered)...else... clauses.

          Just a thought. I'm fine w/ having this be param controlled too.

          Show
          gerlowskija Jason Gerlowski added a comment - Haha, I take it that the manual testing didn't go well then. I like the local-buffering idea. Out of curiosity, do you think there'd be any value in making the different behaviors entirely different TupleStream implementations (i.e. update() and buffered-update() ). It might make the calling syntax a little easier (one less parameter). And it might keep the code cleaner, if we can avoid a bunch of if (buffered)...else... clauses. Just a thought. I'm fine w/ having this be param controlled too.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Locally I won't be able to simulate a large cluster exporting at 10+ million per-second. I've mainly just been thinking about the mechanics of Blocking IO.

          It's possible that Blocking IO will work just fine in this scenario:

          1) Each shard exports Tuples until its network memory buffer is full, then it will block.
          2) Each worker will read Tuples and perform the indexing.
          3) As the workers read more Tuples it will clear space in the shards network memory buffer to export more tuples.

          In this scenario we won't have long periods of blocking on the shards, it will be lot's of stops and starts. Which in theory should be fine.

          Maybe the thing to do is to go with the current implementation and we can always add a buffering implementation if we need to.

          Show
          joel.bernstein Joel Bernstein added a comment - Locally I won't be able to simulate a large cluster exporting at 10+ million per-second. I've mainly just been thinking about the mechanics of Blocking IO. It's possible that Blocking IO will work just fine in this scenario: 1) Each shard exports Tuples until its network memory buffer is full, then it will block. 2) Each worker will read Tuples and perform the indexing. 3) As the workers read more Tuples it will clear space in the shards network memory buffer to export more tuples. In this scenario we won't have long periods of blocking on the shards, it will be lot's of stops and starts. Which in theory should be fine. Maybe the thing to do is to go with the current implementation and we can always add a buffering implementation if we need to.
          Hide
          dpgove Dennis Gove added a comment -

          It seems like a reasonable approach to limit the read rate to the maximum possible write rate. Lets add a buffering option at a later point, if it ends up being necessary.

          Show
          dpgove Dennis Gove added a comment - It seems like a reasonable approach to limit the read rate to the maximum possible write rate. Lets add a buffering option at a later point, if it ends up being necessary.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Ok, if the smaller scale manual testing looks OK, I'll move forward without buffering.

          Show
          joel.bernstein Joel Bernstein added a comment - Ok, if the smaller scale manual testing looks OK, I'll move forward without buffering.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Changed testParallelUpdateStream() to mirror the changes made to testUpdateStream().

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Changed testParallelUpdateStream() to mirror the changes made to testUpdateStream().
          Hide
          joel.bernstein Joel Bernstein added a comment -

          The UpdateStream worked well during manual testing. The test involved streaming 5 million documents from a source collection into a separate destination collection. I used very small documents for the test which loaded at a rate of about 20,000 documents per second. The stream from the source collection was moving at a rate of over 1 million documents per second so there was significant blocking on the export. This did not cause any problems. I tested loading from a single node and in parallel with two nodes. No performance increase could be seen in parallel mode because I believe my laptop was already maxed out. In theory when indexing to a large cluster we would see performance improvements when indexing in parallel.

          I believe this ticket is now ready to commit.

          I ran into a few "ease of use" issues that made it tricky to get the update expression running. I fixed a couple of these issues as part this ticket and I'll open another ticket to address the others.

          Show
          joel.bernstein Joel Bernstein added a comment - The UpdateStream worked well during manual testing. The test involved streaming 5 million documents from a source collection into a separate destination collection. I used very small documents for the test which loaded at a rate of about 20,000 documents per second. The stream from the source collection was moving at a rate of over 1 million documents per second so there was significant blocking on the export. This did not cause any problems. I tested loading from a single node and in parallel with two nodes. No performance increase could be seen in parallel mode because I believe my laptop was already maxed out. In theory when indexing to a large cluster we would see performance improvements when indexing in parallel. I believe this ticket is now ready to commit. I ran into a few "ease of use" issues that made it tricky to get the update expression running. I fixed a couple of these issues as part this ticket and I'll open another ticket to address the others.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Just thinking about how useful it will be to use the UpdateStream to wrap a RollupStream:

          
          parallel(update(rollup(search())))
          
          
          Show
          joel.bernstein Joel Bernstein added a comment - Just thinking about how useful it will be to use the UpdateStream to wrap a RollupStream: parallel(update(rollup(search())))
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Patch with the latest work. Ready to commit but having a hard time getting the full test suite to run through. I had a stall earlier on the StreamingExpressionTests which I had never seen before. So I'm being extra careful with this. I'd like to run the tests successfully several more times to see if it was a one time problem.

          Show
          joel.bernstein Joel Bernstein added a comment - Patch with the latest work. Ready to commit but having a hard time getting the full test suite to run through. I had a stall earlier on the StreamingExpressionTests which I had never seen before. So I'm being extra careful with this. I'd like to run the tests successfully several more times to see if it was a one time problem.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 1722990 from Joel Bernstein in branch 'dev/trunk'
          [ https://svn.apache.org/r1722990 ]

          SOLR-7535: Add UpdateStream to Streaming API and Streaming Expression

          Show
          jira-bot ASF subversion and git services added a comment - Commit 1722990 from Joel Bernstein in branch 'dev/trunk' [ https://svn.apache.org/r1722990 ] SOLR-7535 : Add UpdateStream to Streaming API and Streaming Expression
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Nice work on this ticket Jason Gerlowski!

          Show
          joel.bernstein Joel Bernstein added a comment - Nice work on this ticket Jason Gerlowski !
          Hide
          dpgove Dennis Gove added a comment -

          +1 on that. I'm real excited about this!

          Show
          dpgove Dennis Gove added a comment - +1 on that. I'm real excited about this!
          Hide
          gerlowskija Jason Gerlowski added a comment -

          Happy to help. Joel did the real work in getting this where it needed to be.

          Is it worth creating JIRAs for any of the things that got pushed out of this issue ("CommitStream", and "tying this into SqlHandler" were the main takeaways I think)?

          Show
          gerlowskija Jason Gerlowski added a comment - Happy to help. Joel did the real work in getting this where it needed to be. Is it worth creating JIRAs for any of the things that got pushed out of this issue ("CommitStream", and "tying this into SqlHandler" were the main takeaways I think)?
          Hide
          joel.bernstein Joel Bernstein added a comment -

          I think the CommitStream would be very useful. The main usage would be:

          commit(collection, parallel(update(search()))
          
          or
          
          commit(collection, update(search()))
          

          We could have it commit on EOF as the simplest use case. I think read() should just return all Tuples until it reaches the EOF and then commit the collection.

          We can add the CommitStream to the existing UpdateStream tests.

          Later we can always add more features.

          Show
          joel.bernstein Joel Bernstein added a comment - I think the CommitStream would be very useful. The main usage would be: commit(collection, parallel(update(search())) or commit(collection, update(search())) We could have it commit on EOF as the simplest use case. I think read() should just return all Tuples until it reaches the EOF and then commit the collection. We can add the CommitStream to the existing UpdateStream tests. Later we can always add more features.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          If we don't want to repeat the collection in the commit function we can call children() on the substream and iterate until it finds the UpdateStream. Then get destination collection from the UpdateStream. This would couple the CommitStream to the UpdateStream but I think they're tied together anyway.

          Then it would look like this:

          commit(parallel(update(search)))
          
          Show
          joel.bernstein Joel Bernstein added a comment - If we don't want to repeat the collection in the commit function we can call children() on the substream and iterate until it finds the UpdateStream. Then get destination collection from the UpdateStream. This would couple the CommitStream to the UpdateStream but I think they're tied together anyway. Then it would look like this: commit(parallel(update(search)))

            People

            • Assignee:
              joel.bernstein Joel Bernstein
              Reporter:
              joel.bernstein Joel Bernstein
            • Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development