Uploaded image for project: 'Solr'
  1. Solr
  2. SOLR-8337

Add ReduceOperation and wire it into the ReducerStream

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      The current ReducerStream groups all documents that share the same key(s) into a list and emits a single Tuple that contains this list. There is no way to tell the ReducerStream to do something more interesting with groups, for example summing a column within a group, or joining tuples.

      This ticket adds a new type of operation called a ReduceOperation which is passed to the ReducerStream so that the reduce behavior can be specialized.

      The ReduceOperation has two methods:

      1) operate(Tuple) : This is called once for each Tuple in a group. This method can be used to aggregate Tuples as they added to a group.
      2) reduce() : This is called when the group keys change. This method returns a single Tuple which is output by the ReducerStream. The ReduceOperation must clear it's internal structures when reduce is called as well, to prepare for the next group.

      1. SOLR-8337.patch
        36 kB
        Joel Bernstein
      2. SOLR-8337.patch
        28 kB
        Joel Bernstein
      3. SOLR-8337.patch
        13 kB
        Joel Bernstein
      4. SOLR-8337.patch
        7 kB
        Joel Bernstein
      5. SOLR-8337.patch
        2 kB
        Joel Bernstein

        Issue Links

          Activity

          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Patch adds a single reduce() method for the ReduceOperation that returns a single Tuple, which is the final reduction.

          The operate(Tuple) method will be called for each Tuple that is read by the ReducerStream.

          The reduce() method will be called each time the group by key changes. This will give the ReduceOperation a chance to finish the reduce algorithm and return a single Tuple. The ReduceOperation will also clear it's internal memory after each call to reduce() to prepare for the next Tuple grouping.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Patch adds a single reduce() method for the ReduceOperation that returns a single Tuple, which is the final reduction. The operate(Tuple) method will be called for each Tuple that is read by the ReducerStream . The reduce() method will be called each time the group by key changes. This will give the ReduceOperation a chance to finish the reduce algorithm and return a single Tuple. The ReduceOperation will also clear it's internal memory after each call to reduce() to prepare for the next Tuple grouping.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Currently the ReducerStream is referred to by the group Streaming Expression function. Now that we will be passing in a ReduceOperation it makes sense to call this function reduce. For example the syntax would be:

          reduce(                                      
                       search(collection1, 
                                   q="*:*",
                                   qt="/export", 
                                   fl="id,a_s,a_i,a_f", 
                                   sort="a_s asc, a_f asc"),
                        by="a_s",
                        operation(...))
          
          Show
          joel.bernstein Joel Bernstein added a comment - - edited Currently the ReducerStream is referred to by the group Streaming Expression function. Now that we will be passing in a ReduceOperation it makes sense to call this function reduce . For example the syntax would be: reduce( search(collection1, q= "*:*" , qt= "/export" , fl= "id,a_s,a_i,a_f" , sort= "a_s asc, a_f asc" ), by= "a_s" , operation(...))
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          First crack at adding the ReduceOperation to the ReducerStream.

          I'll create a GroupOperation that will emit a single Tuple with a list of all the Tuples in a group.

          reduce(  
                       search(collection1, 
                                    q="*:*",
                                    qt="/export", 
                                    fl="id,a_s,a_i,a_f", 
                                    sort="a_s asc, a_f asc"),
                       by="a_s",
                       group(sort="a_f asc", n="20"))
          
          Show
          joel.bernstein Joel Bernstein added a comment - - edited First crack at adding the ReduceOperation to the ReducerStream. I'll create a GroupOperation that will emit a single Tuple with a list of all the Tuples in a group. reduce( search(collection1, q= "*:*" , qt= "/export" , fl= "id,a_s,a_i,a_f" , sort= "a_s asc, a_f asc" ), by= "a_s" , group(sort= "a_f asc" , n= "20" ))
          Hide
          dpgove Dennis Gove added a comment -

          I might change the operationExpressions line to get operands of type ReduceOperation.class. This would ensure that only expressions adhering to the ReduceOperation interface are returned. That said, from a user perspective it might be nice to be told you provided a StreamOperation when a ReduceOperation is expected.

          List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ReduceOperation.class);
          
          Show
          dpgove Dennis Gove added a comment - I might change the operationExpressions line to get operands of type ReduceOperation.class. This would ensure that only expressions adhering to the ReduceOperation interface are returned. That said, from a user perspective it might be nice to be told you provided a StreamOperation when a ReduceOperation is expected. List<StreamExpression> operationExpressions = factory.getExpressionOperandsRepresentingTypes(expression, ReduceOperation.class);
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Sounds good. I'll make that change in the next iteration.

          Show
          joel.bernstein Joel Bernstein added a comment - Sounds good. I'll make that change in the next iteration.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Added the GroupOperation. Now we just need some tests.

          Show
          joel.bernstein Joel Bernstein added a comment - Added the GroupOperation. Now we just need some tests.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Patch with passing StreamExpressionTests. Still need to work on the StreamingTests.

          Show
          joel.bernstein Joel Bernstein added a comment - Patch with passing StreamExpressionTests. Still need to work on the StreamingTests.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch with StreamingTests passing.

          Show
          joel.bernstein Joel Bernstein added a comment - New patch with StreamingTests passing.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 1719246 from Joel Bernstein in branch 'dev/trunk'
          [ https://svn.apache.org/r1719246 ]

          SOLR-8337: Add ReduceOperation and wire it into the ReducerStream

          Show
          jira-bot ASF subversion and git services added a comment - Commit 1719246 from Joel Bernstein in branch 'dev/trunk' [ https://svn.apache.org/r1719246 ] SOLR-8337 : Add ReduceOperation and wire it into the ReducerStream
          Hide
          risdenk Kevin Risden added a comment -

          Joel Bernstein - Any reason that this patch removed ".withFunctionName("count", RecordCountStream.class)" from StreamingTest? It looks like RecordCountStream is never used without that line.

          Show
          risdenk Kevin Risden added a comment - Joel Bernstein - Any reason that this patch removed ".withFunctionName("count", RecordCountStream.class)" from StreamingTest? It looks like RecordCountStream is never used without that line.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          It was conflicting with the count metric function call and it was only being used for some very early test cases. I just removed it a changed the test cases.

          Show
          joel.bernstein Joel Bernstein added a comment - It was conflicting with the count metric function call and it was only being used for some very early test cases. I just removed it a changed the test cases.
          Hide
          risdenk Kevin Risden added a comment -

          So that means that the RecordCountStream file can be deleted? Looks like its an unused file.

          Show
          risdenk Kevin Risden added a comment - So that means that the RecordCountStream file can be deleted? Looks like its an unused file.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Yes

          Show
          joel.bernstein Joel Bernstein added a comment - Yes

            People

            • Assignee:
              Unassigned
              Reporter:
              joel.bernstein Joel Bernstein
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development