Uploaded image for project: 'Solr'
  1. Solr
  2. SOLR-7525

Add ComplementStream to the Streaming API and Streaming Expressions

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Minor
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 6.0
    • Component/s: SolrJ
    • Labels:
      None

      Description

      This ticket adds a ComplementStream to the Streaming API and Streaming Expression language.

      The ComplementStream will wrap two TupleStreams (StreamA, StreamB) and emit Tuples from StreamA that are not in StreamB.

      Streaming API Syntax:

      ComplementStream cstream = new ComplementStream(streamA, streamB, comp);
      

      Streaming Expression syntax:

      complement(search(...), search(...), on(...))
      

      Internal implementation will rely on the ReducerStream. The ComplementStream can be parallelized using the ParallelStream.

      1. SOLR-7525.patch
        35 kB
        Dennis Gove
      2. SOLR-7525.patch
        35 kB
        Jason Gerlowski
      3. SOLR-7525.patch
        17 kB
        Jason Gerlowski
      4. SOLR-7525.patch
        32 kB
        Dennis Gove
      5. SOLR-7525.patch
        28 kB
        Dennis Gove
      6. SOLR-7525.patch
        20 kB
        Dennis Gove

        Issue Links

          Activity

          Hide
          dpgove Dennis Gove added a comment -

          I've got a patch for this that also includes an IntersectStream (return tuples from streamA that also exist in streamB). I just want to add some additional tests before a post the patch.

          Show
          dpgove Dennis Gove added a comment - I've got a patch for this that also includes an IntersectStream (return tuples from streamA that also exist in streamB). I just want to add some additional tests before a post the patch.
          Hide
          dpgove Dennis Gove added a comment - - edited

          Includes both ComplementStream and IntersectStream. All tests pass.

          Depends on SOLR-8198.

          Show
          dpgove Dennis Gove added a comment - - edited Includes both ComplementStream and IntersectStream. All tests pass. Depends on SOLR-8198 .
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Ah, I didn't realize there was already a patch for this. I can add the parallel tests tomorrow.

          Show
          joel.bernstein Joel Bernstein added a comment - Ah, I didn't realize there was already a patch for this. I can add the parallel tests tomorrow.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          Hi all.

          I wanted to take a stab at adding the missing parallel tests that Joel
          alluded to in his most recent comment.

          When I went to pull it down though, I realized that this patch no longer
          applies cleanly on top of the recent changes to ReduceOperation/ReducerStream.

          To main highlights of the recent ReducerStream changes are:
          1.) ReducerStream now requires a ReducerOperation.
          2.) (Currently), the only ReducerOperation implementation is GroupOperation
          3.) GroupOperation requires a StreamComparator, and an int 'size'. The
          size is used to limit the number of tuples to hold on to in each grouping.
          When the upper bound is reached, the least tuple is dropped (according to the
          comparator).
          4.) The only StreamComparator implementations are FieldComparator, and
          MultiFieldComparator, both of which require a field name.

          The net effect of these changes is that IntersectStream and ComplementStream need
          a field name at creation time (because they rely on ReducerStream, which relies on
          ReducerOperation, which...).

          As I see it, IntersectStream and ComplementStream shouldn't need
          this chain of objects. AFAICT, since their job is to do logical operations,
          it'd be wrong for their internal ReducerStream to drop tuples based on an
          arbitrary limit. And since we don't want to drop tuples, there's no need for a
          StreamComparator either.

          Two resolutions come to mind here:
          1.) Modify GroupOperation so that the 'size' (and comparator) can be optional.
          2.) Create a no-op StreamComparator, or one that always returns "equal", to pass
          into the existing GroupOperation.

          I'm leaning towards the first option. It seems more generally useful, and creating
          a no-op class seems like a bit of a hack.

          Anyone have opinions/thoughts on this? Have I missed something obvious/simple here,
          or misread the code entirely? Is there another option to resolve this conflict that
          I missed?

          In any case, just wanted to get some feedback on the best way to resolve this change
          before I move onto actually adding the new tests.

          Show
          gerlowskija Jason Gerlowski added a comment - Hi all. I wanted to take a stab at adding the missing parallel tests that Joel alluded to in his most recent comment. When I went to pull it down though, I realized that this patch no longer applies cleanly on top of the recent changes to ReduceOperation/ReducerStream. To main highlights of the recent ReducerStream changes are: 1.) ReducerStream now requires a ReducerOperation. 2.) (Currently), the only ReducerOperation implementation is GroupOperation 3.) GroupOperation requires a StreamComparator , and an int 'size'. The size is used to limit the number of tuples to hold on to in each grouping. When the upper bound is reached, the least tuple is dropped (according to the comparator). 4.) The only StreamComparator implementations are FieldComparator , and MultiFieldComparator , both of which require a field name. The net effect of these changes is that IntersectStream and ComplementStream need a field name at creation time (because they rely on ReducerStream, which relies on ReducerOperation, which...). As I see it, IntersectStream and ComplementStream shouldn't need this chain of objects. AFAICT, since their job is to do logical operations, it'd be wrong for their internal ReducerStream to drop tuples based on an arbitrary limit. And since we don't want to drop tuples, there's no need for a StreamComparator either. Two resolutions come to mind here: 1.) Modify GroupOperation so that the 'size' (and comparator) can be optional. 2.) Create a no-op StreamComparator, or one that always returns "equal", to pass into the existing GroupOperation. I'm leaning towards the first option. It seems more generally useful, and creating a no-op class seems like a bit of a hack. Anyone have opinions/thoughts on this? Have I missed something obvious/simple here, or misread the code entirely? Is there another option to resolve this conflict that I missed? In any case, just wanted to get some feedback on the best way to resolve this change before I move onto actually adding the new tests.
          Hide
          dpgove Dennis Gove added a comment - - edited

          I'll rebase this off trunk so it is a little cleaner but I think the use of ReducerStream still holds.

          The purpose of Complement and Intersect is to return tuples in A which either do or do not exist in B. The tuples in B aren't used for anything and are dropped as soon as possible. The reason they make use of the ReducerStream is because B having 1 instance of some tuple found in A is the same as B having 100 instances of some tuple found in A. Whether its 1 or 100 the tuple exists in B so its twin in A can either be returned from A or not. For this reason the size of the ReducerStream can always just be 1 because we only care about the first one and all others can be dropped from B. The fieldName (or fieldNames because you can do an intersect on N fields) provided to the ReducerStream are the fields the Intersect or Complement streams are acting on.

          Essentially, the goal is to take all the tuples in B and reduce them down to a unique list of tuples where uniqueness is defined over the fields that the intersect or complement is being checked over. Given that B is a set of unique tuples it is much easier to know when to move onto the next tuple in B.

          I'll take a look at the GroupOperation but I would suspect that it can use a StreamEqualitor instead of a StreamComparator. A comparator allows order while an equalitor just checks if they are equal. There may be a reason it allows for ordering, though.

          Show
          dpgove Dennis Gove added a comment - - edited I'll rebase this off trunk so it is a little cleaner but I think the use of ReducerStream still holds. The purpose of Complement and Intersect is to return tuples in A which either do or do not exist in B. The tuples in B aren't used for anything and are dropped as soon as possible. The reason they make use of the ReducerStream is because B having 1 instance of some tuple found in A is the same as B having 100 instances of some tuple found in A. Whether its 1 or 100 the tuple exists in B so its twin in A can either be returned from A or not. For this reason the size of the ReducerStream can always just be 1 because we only care about the first one and all others can be dropped from B. The fieldName (or fieldNames because you can do an intersect on N fields) provided to the ReducerStream are the fields the Intersect or Complement streams are acting on. Essentially, the goal is to take all the tuples in B and reduce them down to a unique list of tuples where uniqueness is defined over the fields that the intersect or complement is being checked over. Given that B is a set of unique tuples it is much easier to know when to move onto the next tuple in B. I'll take a look at the GroupOperation but I would suspect that it can use a StreamEqualitor instead of a StreamComparator. A comparator allows order while an equalitor just checks if they are equal. There may be a reason it allows for ordering, though.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Let's not change the GroupOperation because it has useful functionality. Let's create a new ReduceOperation that behaves the way we need it to.

          The main reason for adding ReduceOperations was so that we could specialize the reduce behavior.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Let's not change the GroupOperation because it has useful functionality. Let's create a new ReduceOperation that behaves the way we need it to. The main reason for adding ReduceOperations was so that we could specialize the reduce behavior.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          the tuples in B aren't used for anything and are dropped as soon as possible. The reason they make use of the ReducerStream is because B having 1 instance of some tuple found in A is the same as B having 100 instances of some tuple found in A. Whether its 1 or 100 the tuple exists in B so its twin in A can either be returned from A or not. For this reason the size of the ReducerStream can always just be 1

          Ah, this makes sense now. I was misreading ReducerStream. That makes most of the rest of my comment invalid. But, learn something new every day I guess...Looking forward to seeing your update to the patch, so I can get a better idea of how this should work. Thanks for the clarification Dennis.

          Show
          gerlowskija Jason Gerlowski added a comment - the tuples in B aren't used for anything and are dropped as soon as possible. The reason they make use of the ReducerStream is because B having 1 instance of some tuple found in A is the same as B having 100 instances of some tuple found in A. Whether its 1 or 100 the tuple exists in B so its twin in A can either be returned from A or not. For this reason the size of the ReducerStream can always just be 1 Ah, this makes sense now. I was misreading ReducerStream . That makes most of the rest of my comment invalid. But, learn something new every day I guess...Looking forward to seeing your update to the patch, so I can get a better idea of how this should work. Thanks for the clarification Dennis.
          Hide
          dpgove Dennis Gove added a comment - - edited

          Rebases off of trunk and adds a DistinctOperation for use in the ReducerStream. The DistinctOperation ensures that for any given group only a single tuple will be returned. Currently it is implemented to return the first tuple in a group but a possible enhancement down the road could be to support a parameter asking for some other tuple in the group (such as the first in a sub-sorted list).

          Also, while implementing this I realized that the UniqueStream can be refactored to be just a type of ReducerStream with DistinctOperation. That change is not included in this patch but will be done under a separate ticket.

          Also of note, I'm not sure if the getChildren() function declared in TupleStream is necessary any longer. If I recall correctly that function was used by the StreamHandler when passing streams to workers but since all that has been changed to pass the result of toExpression(....) I think we can get rid of the getChildren() function. I will explore that possibility.

          Show
          dpgove Dennis Gove added a comment - - edited Rebases off of trunk and adds a DistinctOperation for use in the ReducerStream. The DistinctOperation ensures that for any given group only a single tuple will be returned. Currently it is implemented to return the first tuple in a group but a possible enhancement down the road could be to support a parameter asking for some other tuple in the group (such as the first in a sub-sorted list). Also, while implementing this I realized that the UniqueStream can be refactored to be just a type of ReducerStream with DistinctOperation. That change is not included in this patch but will be done under a separate ticket. Also of note, I'm not sure if the getChildren() function declared in TupleStream is necessary any longer. If I recall correctly that function was used by the StreamHandler when passing streams to workers but since all that has been changed to pass the result of toExpression(....) I think we can get rid of the getChildren() function. I will explore that possibility.
          Hide
          dpgove Dennis Gove added a comment -

          As it turns out IntersectStream and ComplementStream can both make use of a UniqueStream which makes use of a ReducerStream. As such this new patch implements Intersect and Complement with streamB as an instance of UniqueStream. UniqueStream is changed to be implemented as a type of ReducerStream.

          Show
          dpgove Dennis Gove added a comment - As it turns out IntersectStream and ComplementStream can both make use of a UniqueStream which makes use of a ReducerStream. As such this new patch implements Intersect and Complement with streamB as an instance of UniqueStream. UniqueStream is changed to be implemented as a type of ReducerStream.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          Hey Dennis, question about the new patch.

          Is there a reason that DistinctOperation was created with a ctor taking in a StreamExpression and a StreamFactory? Is there a reason that both ctors call into an empty init() function? Is that a consistency/style thing that we're trying to stick to in this part of the code (keep all the ReduceOperation implementations structured similarly). Or is there a technical reason for this?

          Other than that question, everything looks good to me.

          Show
          gerlowskija Jason Gerlowski added a comment - Hey Dennis, question about the new patch. Is there a reason that DistinctOperation was created with a ctor taking in a StreamExpression and a StreamFactory ? Is there a reason that both ctors call into an empty init() function? Is that a consistency/style thing that we're trying to stick to in this part of the code (keep all the ReduceOperation implementations structured similarly). Or is there a technical reason for this? Other than that question, everything looks good to me.
          Hide
          dpgove Dennis Gove added a comment -

          Yes, you hit that right on the head. It was for consistency in the structure of Expressible classes. Also, currently it's implemented to return the first seen tuple in a group. However, I could see an enhancement where one could provide a selector to choose maybe the last seen, or the first based on some alternative order. For example, were someone to use the DistinctOperation in an expression it would currently look like this

          distinct()
          

          but I could also see it looking like one of these

          distinct(first, sort="fieldA desc, fieldB desc")
          distinct(first, having="fieldA != null")
          

          Essentially, although not currently supported it would be possible to expand the reducer operations to support complex selectors when a choice over which tuple to select is required.

          All that said, for now it's just for consistency.

          Show
          dpgove Dennis Gove added a comment - Yes, you hit that right on the head. It was for consistency in the structure of Expressible classes. Also, currently it's implemented to return the first seen tuple in a group. However, I could see an enhancement where one could provide a selector to choose maybe the last seen, or the first based on some alternative order. For example, were someone to use the DistinctOperation in an expression it would currently look like this distinct() but I could also see it looking like one of these distinct(first, sort= "fieldA desc, fieldB desc" ) distinct(first, having= "fieldA != null " ) Essentially, although not currently supported it would be possible to expand the reducer operations to support complex selectors when a choice over which tuple to select is required. All that said, for now it's just for consistency.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          I added two test clauses in StreamExpressionTest that combine the use of Complement/Intersect with a ParallelStream. testParallelComplementStream and testParallelIntersectStream.

          Hopefully this is what Joel was looking for when he mentioned that this could use some parallel tests.

          Show
          gerlowskija Jason Gerlowski added a comment - I added two test clauses in StreamExpressionTest that combine the use of Complement/Intersect with a ParallelStream . testParallelComplementStream and testParallelIntersectStream . Hopefully this is what Joel was looking for when he mentioned that this could use some parallel tests.
          Hide
          gerlowskija Jason Gerlowski added a comment -

          Uh, oh. When I uploaded my recent patch, I neglected to svn add some files that Dennis had created in his prior revision.

          This updated patch fixes the prior mistake.

          Joel Bernstein Is there anything else you were looking for test-wise on this patch? I added tests that mirrored the parallel-tests from other streams, but I might've missed something.

          Show
          gerlowskija Jason Gerlowski added a comment - Uh, oh. When I uploaded my recent patch, I neglected to svn add some files that Dennis had created in his prior revision. This updated patch fixes the prior mistake. Joel Bernstein Is there anything else you were looking for test-wise on this patch? I added tests that mirrored the parallel-tests from other streams, but I might've missed something.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Jason Gerlowski, the latest patch looks pretty far along. I may add a few more tests before committing but it's pretty close.
          Thanks for your work on this!

          Show
          joel.bernstein Joel Bernstein added a comment - Jason Gerlowski , the latest patch looks pretty far along. I may add a few more tests before committing but it's pretty close. Thanks for your work on this!
          Hide
          dpgove Dennis Gove added a comment -

          Rebased against trunk

          Show
          dpgove Dennis Gove added a comment - Rebased against trunk
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 1724468 from dpgove@apache.org in branch 'dev/trunk'
          [ https://svn.apache.org/r1724468 ]

          SOLR-7525: Add ComplementStream and IntersectStream to the Streaming API and Streaming Expressions

          Show
          jira-bot ASF subversion and git services added a comment - Commit 1724468 from dpgove@apache.org in branch 'dev/trunk' [ https://svn.apache.org/r1724468 ] SOLR-7525 : Add ComplementStream and IntersectStream to the Streaming API and Streaming Expressions

            People

            • Assignee:
              dpgove Dennis Gove
              Reporter:
              joel.bernstein Joel Bernstein
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development