Details

    • Type: New Feature
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 6.1, 7.0
    • Component/s: None
    • Labels:
      None

      Description

      The sort Streaming Expression does an in memory sort of the Tuples returned by it's underlying stream. This is intended to be used for sorting sets gathered during local graph traversals. This will make it easy to gather sets during a traversal and use all of the sort based set operations (merge, innerJoin, outerJoin, reduce, complement, intersect).

      This will be particularly useful with the gatherNodes expression (SOLR-8925). Sample syntax:

      intersect(
             sort(gatherNodes(...), "fieldA asc"),
             sort(gatherNodes(...), "fieldA asc"),
             on)
      
      1. SOLR-8962.patch
        11 kB
        Dennis Gove
      2. SOLR-8962.patch
        9 kB
        Dennis Gove

        Issue Links

          Activity

          Hide
          dpgove Dennis Gove added a comment - - edited

          Working on this. My first thought is to use a TreeSet for the internal storage as it'll give us nlogn performance for the creation of the sorted tree. Beyond that I'll be sure to take into account situations where there are duplicates according to the sort.

          Show
          dpgove Dennis Gove added a comment - - edited Working on this. My first thought is to use a TreeSet for the internal storage as it'll give us nlogn performance for the creation of the sorted tree. Beyond that I'll be sure to take into account situations where there are duplicates according to the sort.
          Hide
          dpgove Dennis Gove added a comment -

          Been giving this a bit of thought and I think I'm gonna go with the following approach.

          By default this will use Java's standard Collections sort method which provides nlogn performance. If we also take into account the time to read all the tuples from the incoming sort into a List (currently thinking LinkedList because insertion is O(1) but I do want to confirm that sorting will still be done in nlogn with a linked list) then that cost is just n. Total time cost for the sort would be 2nlogn.

          That said, there may be cases where a more performant sort, like counting sort, could be used. For those cases the stream will also accept an algorithm parameter asking for the use of a different sorting algorithm.

          Show
          dpgove Dennis Gove added a comment - Been giving this a bit of thought and I think I'm gonna go with the following approach. By default this will use Java's standard Collections sort method which provides nlogn performance. If we also take into account the time to read all the tuples from the incoming sort into a List (currently thinking LinkedList because insertion is O(1) but I do want to confirm that sorting will still be done in nlogn with a linked list) then that cost is just n. Total time cost for the sort would be 2nlogn. That said, there may be cases where a more performant sort, like counting sort, could be used. For those cases the stream will also accept an algorithm parameter asking for the use of a different sorting algorithm.
          Hide
          dpgove Dennis Gove added a comment -

          Basic implementation supporting just the basic java default merge sort. Internally this uses a Worker lambda for the actual sorting. My idea behind the worker is that it'll be very easy to add other supported sort algorithms as we find a need - all you need to do is create a different worker during initialization.

          Show
          dpgove Dennis Gove added a comment - Basic implementation supporting just the basic java default merge sort. Internally this uses a Worker lambda for the actual sorting. My idea behind the worker is that it'll be very easy to add other supported sort algorithms as we find a need - all you need to do is create a different worker during initialization.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Looks good.

          One thing we can consider in future implementations is the merge sort fork/join. The gatherNodes function is going to return a stream of Tuples that contains long runs of pre-sorted Tuples. This is because the /export handler is going to be returning the nodes already sorted. But because the traversal is done in batches, the stream will have a pattern of runs of sorted Tuples. I suspect this will work nicely with the merge sort fork join, plus we get the threading. In my testing sorting is an operation that scales really nicely in parallel because the memory locality of sorts is very tight, so you don't get memory bound.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Looks good. One thing we can consider in future implementations is the merge sort fork/join. The gatherNodes function is going to return a stream of Tuples that contains long runs of pre-sorted Tuples. This is because the /export handler is going to be returning the nodes already sorted. But because the traversal is done in batches, the stream will have a pattern of runs of sorted Tuples. I suspect this will work nicely with the merge sort fork join, plus we get the threading. In my testing sorting is an operation that scales really nicely in parallel because the memory locality of sorts is very tight, so you don't get memory bound.
          Hide
          dpgove Dennis Gove added a comment -

          That sounds like a good algorithm for this to support.

          Independently of this I've been looking at creating a PartitionStream that can be used to partition streams off to different workers. Where this differs from a ParallelStream is that it can partition streams in the middle of a pipeline (ie, after a join). The biggest hangup I've had on it is how best to express a PartitionStream but I'm fairly confident I've come up with a good solution.

          A PartitionStream could be used to do mergesort fork/join across different workers which would be helpful in situations where the dataset is too large for a single process to realistically handle.

          Show
          dpgove Dennis Gove added a comment - That sounds like a good algorithm for this to support. Independently of this I've been looking at creating a PartitionStream that can be used to partition streams off to different workers. Where this differs from a ParallelStream is that it can partition streams in the middle of a pipeline (ie, after a join). The biggest hangup I've had on it is how best to express a PartitionStream but I'm fairly confident I've come up with a good solution. A PartitionStream could be used to do mergesort fork/join across different workers which would be helpful in situations where the dataset is too large for a single process to realistically handle.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Let's commit this to trunk when you're ready. I'm going to be building some test cases around this for SOLR-8925. The gatherNodes function is also going to use the sort function internally to support the random parameter which will pull a random node set that matches the traversal join rather then the exhaustive list of nodes. This will be important for getting recommendations from very large graphs.

          Show
          joel.bernstein Joel Bernstein added a comment - Let's commit this to trunk when you're ready. I'm going to be building some test cases around this for SOLR-8925 . The gatherNodes function is also going to use the sort function internally to support the random parameter which will pull a random node set that matches the traversal join rather then the exhaustive list of nodes. This will be important for getting recommendations from very large graphs.
          Hide
          dpgove Dennis Gove added a comment -

          Final version - adds the function name to the StreamHandler.

          Show
          dpgove Dennis Gove added a comment - Final version - adds the function name to the StreamHandler.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit eb74d814bb760cfd2f7234183f2db3d4f09ec48b in lucene-solr's branch refs/heads/master from Dennis Gove
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=eb74d81 ]

          SOLR-8962: Adds a Sort stream w/sort function name

          Show
          jira-bot ASF subversion and git services added a comment - Commit eb74d814bb760cfd2f7234183f2db3d4f09ec48b in lucene-solr's branch refs/heads/master from Dennis Gove [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=eb74d81 ] SOLR-8962 : Adds a Sort stream w/sort function name
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit b7c5482d3e1745952ef5de796885cb28095f75be in lucene-solr's branch refs/heads/master from Dennis Gove
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=b7c5482 ]

          SOLR-8962: Fixes incorrectly attributed JIRA ticket in CHANGES.txt

          Show
          jira-bot ASF subversion and git services added a comment - Commit b7c5482d3e1745952ef5de796885cb28095f75be in lucene-solr's branch refs/heads/master from Dennis Gove [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=b7c5482 ] SOLR-8962 : Fixes incorrectly attributed JIRA ticket in CHANGES.txt
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 020f15f43735534412abcc5a9f614517fb466fdc in lucene-solr's branch refs/heads/branch_6x from Dennis Gove
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=020f15f ]

          SOLR-8962: Adds a Sort stream w/sort function name

          Show
          jira-bot ASF subversion and git services added a comment - Commit 020f15f43735534412abcc5a9f614517fb466fdc in lucene-solr's branch refs/heads/branch_6x from Dennis Gove [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=020f15f ] SOLR-8962 : Adds a Sort stream w/sort function name
          Hide
          dpgove Dennis Gove added a comment -

          Added to master, branch_6x, and updated the reference guide.

          Show
          dpgove Dennis Gove added a comment - Added to master, branch_6x, and updated the reference guide .
          Hide
          ctargett Cassandra Targett added a comment -

          Hey Dennis Gove, the Ref Guide is still in process for publication for 6.0, so if this new expression is only available in 6.1, it's a bit early to add it to the Ref Guide. Once we get done with the 6.0 publication process, we'll have the Ref Guide open for new stuff in 6.1 - that's probably a few more days away.

          Show
          ctargett Cassandra Targett added a comment - Hey Dennis Gove , the Ref Guide is still in process for publication for 6.0, so if this new expression is only available in 6.1, it's a bit early to add it to the Ref Guide. Once we get done with the 6.0 publication process, we'll have the Ref Guide open for new stuff in 6.1 - that's probably a few more days away.
          Hide
          dpgove Dennis Gove added a comment -

          Of course. I've gone ahead and removed the sort stream from the reference guide. Thank you for the reminder.

          Show
          dpgove Dennis Gove added a comment - Of course. I've gone ahead and removed the sort stream from the reference guide. Thank you for the reminder.
          Hide
          ctargett Cassandra Targett added a comment -

          Thanks. I put the section you had written to the page https://cwiki.apache.org/confluence/display/solr/Internal+-+Trunk+Changes+to+Document so it's not lost forever and we can just put it back for 6.1. Even though that page says "trunk", I think we could use it for any future feature or functionality you want to write up some docs for before the Ref Guide is open for that version.

          Show
          ctargett Cassandra Targett added a comment - Thanks. I put the section you had written to the page https://cwiki.apache.org/confluence/display/solr/Internal+-+Trunk+Changes+to+Document so it's not lost forever and we can just put it back for 6.1. Even though that page says "trunk", I think we could use it for any future feature or functionality you want to write up some docs for before the Ref Guide is open for that version.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          I've been thinking a little bit about this ticket. One of this nice things it provides is a the ability to re-sort a set following a join. So we could innerJoin->sort-rollup, which is a key use case. We can also innerJoin->sort->innerJoin which is also a key use case.

          I did a quick test to see how many random strings could be sorted per-second. I used the Random class to pick random longs and turned the longs into Strings for the test set.

          I was seeing sort times of 1 second for 1.5 million random strings, using Collections.sort().

          So with 50 workers that translates to roughly 75 million records per second.

          With fork/join merge sort we should be able to scale nearly linearly until we hit the number of processors on the server. This is because of the tight memory locality of sorting, which won't saturate the memory bus. So with 8 threads we can expect to sort close to 12 million records per second on each worker. Now we're talking some big numbers. With 50 workers we'd be sorting 600,000,000 records per-second.

          What's nice about the fork/join is it gives us two levels of parallelism. We get the first level a of parallelism by having multiple workers and then we get the second level by threading. I see some very fast operations following joins in the future.

          Show
          joel.bernstein Joel Bernstein added a comment - I've been thinking a little bit about this ticket. One of this nice things it provides is a the ability to re-sort a set following a join. So we could innerJoin->sort-rollup, which is a key use case. We can also innerJoin->sort->innerJoin which is also a key use case. I did a quick test to see how many random strings could be sorted per-second. I used the Random class to pick random longs and turned the longs into Strings for the test set. I was seeing sort times of 1 second for 1.5 million random strings, using Collections.sort(). So with 50 workers that translates to roughly 75 million records per second. With fork/join merge sort we should be able to scale nearly linearly until we hit the number of processors on the server. This is because of the tight memory locality of sorting, which won't saturate the memory bus. So with 8 threads we can expect to sort close to 12 million records per second on each worker. Now we're talking some big numbers. With 50 workers we'd be sorting 600,000,000 records per-second. What's nice about the fork/join is it gives us two levels of parallelism. We get the first level a of parallelism by having multiple workers and then we get the second level by threading. I see some very fast operations following joins in the future.
          Hide
          hossman Hoss Man added a comment - - edited

          Manually correcting fixVersion per Step #S5 of LUCENE-7271

          Show
          hossman Hoss Man added a comment - - edited Manually correcting fixVersion per Step #S5 of LUCENE-7271

            People

            • Assignee:
              dpgove Dennis Gove
              Reporter:
              joel.bernstein Joel Bernstein
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development