Uploaded image for project: 'Solr'
  1. Solr
  2. SOLR-8888

Add shortestPath Streaming Expression

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: 6.1
    • Fix Version/s: 6.1
    • Component/s: None
    • Labels:
      None

      Description

      This ticket is to implement a distributed shortest path graph traversal as a Streaming Expression.

      Expression syntax:

      shortestPath(collection, 
                           from="john@company.com", 
                           to="jane@company.com",
                           edge="from=to",
                           threads="6",
                           partitionSize="300", 
                           fq="limiting query", 
                           maxDepth="4")
      

      The expression above performs a breadth first search to find the shortest paths in an unweighted, directed graph. The search starts from the node john@company.com and searches for the node jane@company.com, traversing the edges by iteratively joining the from and to columns. Each level in the traversal is implemented as a parallel partitioned nested loop join across the entire collection. The threads parameter controls the number of threads performing the join at each level. The partitionSize controls the of number of nodes in each join partition. maxDepth controls the number of levels to traverse. fq is a limiting query applied to each level in the traversal.

      Future implementations can add more capabilities such as weighted traversals.

      1. SOLR-8888.1.patch
        8 kB
        Cao Manh Dat
      2. SOLR-8888.patch
        42 kB
        Joel Bernstein
      3. SOLR-8888.patch
        42 kB
        Joel Bernstein
      4. SOLR-8888.patch
        45 kB
        Joel Bernstein
      5. SOLR-8888.patch
        43 kB
        Joel Bernstein
      6. SOLR-8888.patch
        43 kB
        Joel Bernstein
      7. SOLR-8888.patch
        22 kB
        Joel Bernstein
      8. SOLR-8888.patch
        22 kB
        Joel Bernstein
      9. SOLR-8888.patch
        22 kB
        Joel Bernstein
      10. SOLR-8888.patch
        10 kB
        Joel Bernstein

        Issue Links

          Activity

          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Thanks for the patcht! Let's create a new for ticket for this though, something like "refactor shortestPath streaming expression".

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Thanks for the patcht! Let's create a new for ticket for this though, something like "refactor shortestPath streaming expression".
          Hide
          caomanhdat Cao Manh Dat added a comment -

          I just wanna refactor the code a little bit.
          Joel Bernstein Please review.

          Show
          caomanhdat Cao Manh Dat added a comment - I just wanna refactor the code a little bit. Joel Bernstein Please review.
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 586afc3db117eabb31e2572da9bd3b7665cdccc8 in lucene-solr's branch refs/heads/branch_6x from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=586afc3 ]

          SOLR-8888: Update CHANGES.txt

          Show
          jira-bot ASF subversion and git services added a comment - Commit 586afc3db117eabb31e2572da9bd3b7665cdccc8 in lucene-solr's branch refs/heads/branch_6x from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=586afc3 ] SOLR-8888 : Update CHANGES.txt
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit f8ae0d0deb0f2a8c035c89dbf118646531f60f71 in lucene-solr's branch refs/heads/master from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=f8ae0d0 ]

          SOLR-8888: Update CHANGES.txt

          Show
          jira-bot ASF subversion and git services added a comment - Commit f8ae0d0deb0f2a8c035c89dbf118646531f60f71 in lucene-solr's branch refs/heads/master from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=f8ae0d0 ] SOLR-8888 : Update CHANGES.txt
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit ffdfceba5371b1c3f96b44c727025f2f27bbf12b in lucene-solr's branch refs/heads/branch_6x from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=ffdfceb ]

          SOLR-8888: Add shortestPath Streaming Expression

          Show
          jira-bot ASF subversion and git services added a comment - Commit ffdfceba5371b1c3f96b44c727025f2f27bbf12b in lucene-solr's branch refs/heads/branch_6x from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=ffdfceb ] SOLR-8888 : Add shortestPath Streaming Expression
          Hide
          jira-bot ASF subversion and git services added a comment -

          Commit 3500b45d6d28253d44e48ff8e444774a5fb3ace0 in lucene-solr's branch refs/heads/master from jbernste
          [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=3500b45 ]

          SOLR-8888: Add shortestPath Streaming Expression

          Show
          jira-bot ASF subversion and git services added a comment - Commit 3500b45d6d28253d44e48ff8e444774a5fb3ace0 in lucene-solr's branch refs/heads/master from jbernste [ https://git-wip-us.apache.org/repos/asf?p=lucene-solr.git;h=3500b45 ] SOLR-8888 : Add shortestPath Streaming Expression
          Hide
          joel.bernstein Joel Bernstein added a comment -

          OK, feel pretty good about this. Committing to trunk shortly.

          Show
          joel.bernstein Joel Bernstein added a comment - OK, feel pretty good about this. Committing to trunk shortly.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Added GraphExpressionTests returning all the shortest paths found.

          Show
          joel.bernstein Joel Bernstein added a comment - Added GraphExpressionTests returning all the shortest paths found.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch which returns all of the shortest paths. Manual tests look good but more unit tests are needed with this algorithm.

          Show
          joel.bernstein Joel Bernstein added a comment - New patch which returns all of the shortest paths. Manual tests look good but more unit tests are needed with this algorithm.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          I think this almost ready to commit. As Dennis Gove points out it's not a generic solution for graph traversals, but it's a useful recipe that probably deserves a top level expression. As more generic approaches are developed we can always swap out the implementation to use generic solutions.

          The next step I believe would be to implement a nodes() expression, which would use the same parallel joining technique used in this ticket. But instead of finding the shortest path it will simply iterate the nodes and track the traversal. This could be wrapped by other streams to operate over.

          The nodes() expression will also be needed to support Tinkerpop/Gremlin, which is an important goal as well.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited I think this almost ready to commit. As Dennis Gove points out it's not a generic solution for graph traversals, but it's a useful recipe that probably deserves a top level expression. As more generic approaches are developed we can always swap out the implementation to use generic solutions. The next step I believe would be to implement a nodes() expression, which would use the same parallel joining technique used in this ticket. But instead of finding the shortest path it will simply iterate the nodes and track the traversal. This could be wrapped by other streams to operate over. The nodes() expression will also be needed to support Tinkerpop/Gremlin, which is an important goal as well.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          Patch with the StreamExpression methods implemented and Stream Exrpression test cases

          Show
          joel.bernstein Joel Bernstein added a comment - - edited Patch with the StreamExpression methods implemented and Stream Exrpression test cases
          Hide
          joel.bernstein Joel Bernstein added a comment -

          I was suggesting that we can have both a generic approach to graph traversals (like the ReducerStream) and specific graph expressions. Eventually the generic approach could power the specific expression.

          In the beginning it's easier to start with a couple of specific uses cases to iron out the mechanics of how to do the distributed traversals. This will help us come up with a generic approach.

          Show
          joel.bernstein Joel Bernstein added a comment - I was suggesting that we can have both a generic approach to graph traversals (like the ReducerStream) and specific graph expressions. Eventually the generic approach could power the specific expression. In the beginning it's easier to start with a couple of specific uses cases to iron out the mechanics of how to do the distributed traversals. This will help us come up with a generic approach.
          Hide
          dpgove Dennis Gove added a comment - - edited

          I wasn't suggesting that the ReduceStream be used for graph traversals, just that a similar approach to the design be used. For example,

          graph(
            collection,
            set=node([some q and fq defining the nodes to include in graph]),
            selector=shortestPath(
               from=node([some q and fq defining a starting node or nodes]),
               to=node([some q and fq defining an ending node or nodes]),
               edge="fieldA=fieldB"
            )
          )
          

          or use a stream as the input set with

          graph(
            streamOfTuples,
            selector=shortestPath(
               from=node([some q and fq defining a starting node or nodes]),
               to=node([some q and fq defining an ending node or nodes]),
               edge="fieldA=fieldB"
            )
          )
          
          Show
          dpgove Dennis Gove added a comment - - edited I wasn't suggesting that the ReduceStream be used for graph traversals, just that a similar approach to the design be used. For example, graph( collection, set=node([some q and fq defining the nodes to include in graph]), selector=shortestPath( from=node([some q and fq defining a starting node or nodes]), to=node([some q and fq defining an ending node or nodes]), edge= "fieldA=fieldB" ) ) or use a stream as the input set with graph( streamOfTuples, selector=shortestPath( from=node([some q and fq defining a starting node or nodes]), to=node([some q and fq defining an ending node or nodes]), edge= "fieldA=fieldB" ) )
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Ok, I do see how the columns are "=" during join, so it does makes sense. I'll change the syntax.

          Show
          joel.bernstein Joel Bernstein added a comment - Ok, I do see how the columns are "=" during join, so it does makes sense. I'll change the syntax.
          Hide
          dpgove Dennis Gove added a comment -

          I guess my thinking is that an edge exists when two nodes (1 and 2) have the same value for some field or fields (A and B). That is, if node1.colA = node2.colB then there exists an edge between node1 and node2. With this, the `edge` parameter defines what constitutes the existence of an edge between two nodes.

          Show
          dpgove Dennis Gove added a comment - I guess my thinking is that an edge exists when two nodes (1 and 2) have the same value for some field or fields (A and B). That is, if node1.colA = node2.colB then there exists an edge between node1 and node2. With this, the `edge` parameter defines what constitutes the existence of an edge between two nodes.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          What the syntax is trying to express is "traverse this edge".

          The "=" sign implies that the nodes in the edge are equal.

          Show
          joel.bernstein Joel Bernstein added a comment - What the syntax is trying to express is "traverse this edge". The "=" sign implies that the nodes in the edge are equal.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          I think we can approach graph work in much the same way we approached relational algebra. We have some specific streams that do joins etc... and we have a ReducerStream which can do lot's of relational algebra on it's own. Eventually the ReducerStream could power some of the joins like it currently powers unique.

          With graph queries we can have some specific expressions and a generic reduce expression as well.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited I think we can approach graph work in much the same way we approached relational algebra. We have some specific streams that do joins etc... and we have a ReducerStream which can do lot's of relational algebra on it's own. Eventually the ReducerStream could power some of the joins like it currently powers unique. With graph queries we can have some specific expressions and a generic reduce expression as well.
          Hide
          dpgove Dennis Gove added a comment -

          Would it be reasonable to give another thought to the structure of this? I was thinking it could take the same route as the ReduceStream where one can do all kinds of reductions on the tuples. Would it be possible to create a GraphStream and apply operations (don't know if 'operation' is the correct term for this) over the graph, with one such operation being ShortestPath?

          Show
          dpgove Dennis Gove added a comment - Would it be reasonable to give another thought to the structure of this? I was thinking it could take the same route as the ReduceStream where one can do all kinds of reductions on the tuples. Would it be possible to create a GraphStream and apply operations (don't know if 'operation' is the correct term for this) over the graph, with one such operation being ShortestPath?
          Hide
          dpgove Dennis Gove added a comment - - edited

          I think the edge format should be an equality, like colA=colB

          shortestPath(collection, 
                               from="node1", 
                               to="node2",
                               edge="colA=colB",
                               threads="6",
                               partitionSize="300", 
                               fq="limiting query", 
                               maxDepth="10")
          

          Aside from the discussion above, this would allow situations where multiple fields are used in the join

          shortestPath(collection, 
                               from="node1", 
                               to="node2",
                               edge="colA=colB, colC=colD",
                               threads="6",
                               partitionSize="300", 
                               fq="limiting query", 
                               maxDepth="10")
          

          as well as situations where the nodes have the same field names

          shortestPath(collection, 
                               from="node1", 
                               to="node2",
                               edge="colA=colB, colC",
                               threads="6",
                               partitionSize="300", 
                               fq="limiting query", 
                               maxDepth="10")
          
          Show
          dpgove Dennis Gove added a comment - - edited I think the edge format should be an equality, like colA=colB shortestPath(collection, from= "node1" , to= "node2" , edge= "colA=colB" , threads= "6" , partitionSize= "300" , fq= "limiting query" , maxDepth= "10" ) Aside from the discussion above, this would allow situations where multiple fields are used in the join shortestPath(collection, from= "node1" , to= "node2" , edge= "colA=colB, colC=colD" , threads= "6" , partitionSize= "300" , fq= "limiting query" , maxDepth= "10" ) as well as situations where the nodes have the same field names shortestPath(collection, from= "node1" , to= "node2" , edge= "colA=colB, colC" , threads= "6" , partitionSize= "300" , fq= "limiting query" , maxDepth= "10" )
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Initial manual testing with the Enron emails is going well and returning the shortest path nicely. Next step is to add the Streaming Expression syntax and add more junit tests.

          Show
          joel.bernstein Joel Bernstein added a comment - Initial manual testing with the Enron emails is going well and returning the shortest path nicely. Next step is to add the Streaming Expression syntax and add more junit tests.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          New patch adds a subsort to the queries to ensure results are deterministic.

          Show
          joel.bernstein Joel Bernstein added a comment - New patch adds a subsort to the queries to ensure results are deterministic.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Latest patch fixes bug found during manual testing with Enron email

          Show
          joel.bernstein Joel Bernstein added a comment - Latest patch fixes bug found during manual testing with Enron email
          Hide
          joel.bernstein Joel Bernstein added a comment -

          Added first simple test case.

          I'll also be testing with some scale using the Enron emails data set.

          Show
          joel.bernstein Joel Bernstein added a comment - Added first simple test case. I'll also be testing with some scale using the Enron emails data set.
          Hide
          joel.bernstein Joel Bernstein added a comment - - edited

          First patch which implements a breadth first search using a threaded nested loop join. Each join in the traversal is split up into batches and is executed in threads within the worker node. This approach spreads the join across all replicas. The bottleneck in this scenario will be the network as potentially dozens of search nodes will be returning nodes in parallel to the same worker to satisfy the join. This bottleneck can be greatly reduced by compression because the edges are returned sorted by the toField, which will cause large amount of repeated data to be streamed in the same compression block. SOLR-8910 has been opened to add Lz4 compression to the /export handler.

          In my last comment I mentioned using sorted memory mapped files for the book keeping. In this patch all book keeping is done in memory using HashMaps.

          Show
          joel.bernstein Joel Bernstein added a comment - - edited First patch which implements a breadth first search using a threaded nested loop join. Each join in the traversal is split up into batches and is executed in threads within the worker node. This approach spreads the join across all replicas. The bottleneck in this scenario will be the network as potentially dozens of search nodes will be returning nodes in parallel to the same worker to satisfy the join. This bottleneck can be greatly reduced by compression because the edges are returned sorted by the toField, which will cause large amount of repeated data to be streamed in the same compression block. SOLR-8910 has been opened to add Lz4 compression to the /export handler. In my last comment I mentioned using sorted memory mapped files for the book keeping. In this patch all book keeping is done in memory using HashMaps.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          I've been digging into the implementation and it looks like Streaming provides some real advantages.

          The biggest advantage comes from the ability to sort entire results by the Node id and do this in parallel across the cluster. This means that once the Nodes arrive at the worker they can simply be written to memory mapped files for the book keeping. The book keeping files need to be sorted by Node Id and most likely need offset information to support binary searching and skipping during intersections. I looked at using MapDB for the book keeping and if the data wasn't already coming in sorted then this would have been the approach to use. But even as fast as MapDB is there is still overhead that we don't need in managing the BTree's.

          So, in order to get the maximum speed in reading and writing the book keeping files I'm planning on just using memory mapped files with offsets. This is going to take more time to develop but will pay off when there are large traversals.

          Show
          joel.bernstein Joel Bernstein added a comment - I've been digging into the implementation and it looks like Streaming provides some real advantages. The biggest advantage comes from the ability to sort entire results by the Node id and do this in parallel across the cluster. This means that once the Nodes arrive at the worker they can simply be written to memory mapped files for the book keeping. The book keeping files need to be sorted by Node Id and most likely need offset information to support binary searching and skipping during intersections. I looked at using MapDB for the book keeping and if the data wasn't already coming in sorted then this would have been the approach to use. But even as fast as MapDB is there is still overhead that we don't need in managing the BTree's. So, in order to get the maximum speed in reading and writing the book keeping files I'm planning on just using memory mapped files with offsets. This is going to take more time to develop but will pay off when there are large traversals.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          That's much nicer. I'll work from this syntax. Thanks!

          Show
          joel.bernstein Joel Bernstein added a comment - That's much nicer. I'll work from this syntax. Thanks!
          Hide
          dpgove Dennis Gove added a comment -

          Oh I see. How about something that is explicit about what edge should be followed

          shortestPath(collection, 
                               from="node1", 
                               to="node2",
                               edge="colA=colB" 
                               fq="limiting query", 
                               maxDepth="10")
          

          This would make the edge just a use of the FieldEqualitor and possibly simplify the creation of the joins. Also, as we expand the set of FieldEqualitors the graph queries can benefit.

          As an example to the expansion of FieldEqualitor, though outside the scope of this ticket, imagine a case where we'd want to say colA=colB+-5, which would translate to two fields being within 5 of each other. An equalitor could be created to support such a range case.

          Show
          dpgove Dennis Gove added a comment - Oh I see. How about something that is explicit about what edge should be followed shortestPath(collection, from= "node1" , to= "node2" , edge= "colA=colB" fq= "limiting query" , maxDepth= "10" ) This would make the edge just a use of the FieldEqualitor and possibly simplify the creation of the joins. Also, as we expand the set of FieldEqualitors the graph queries can benefit. As an example to the expansion of FieldEqualitor, though outside the scope of this ticket, imagine a case where we'd want to say colA=colB+-5, which would translate to two fields being within 5 of each other. An equalitor could be created to support such a range case.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          The implementation will need to keep sorted files for each iteration that look like this:

          nodeA, parentNode
          nodeB, parantNode
          ....

          These files can then be intersected with each join to implement cycle detection. The files will also be used to trace back through the nodes to get the path when the node being traversed to is found.

          Show
          joel.bernstein Joel Bernstein added a comment - The implementation will need to keep sorted files for each iteration that look like this: nodeA, parentNode nodeB, parantNode .... These files can then be intersected with each join to implement cycle detection. The files will also be used to trace back through the nodes to get the path when the node being traversed to is found.
          Hide
          joel.bernstein Joel Bernstein added a comment -

          I've changed the syntax a couple of times. But in the latest colA and colB are the field names. So basically traverse from colA to colB iteratively until colB=node2 is found.

          The implementation would parse the columns from the from and to parameters.

          The intial query params would be:

          colA:node1,
          fl="colB",
          sort="colB asc"

          The subsequent join queries would be done in batches:

          colA:(nodeX, nodeY, ....)
          fl:"colB",
          sort="colB asc"

          Show
          joel.bernstein Joel Bernstein added a comment - I've changed the syntax a couple of times. But in the latest colA and colB are the field names. So basically traverse from colA to colB iteratively until colB=node2 is found. The implementation would parse the columns from the from and to parameters. The intial query params would be: colA:node1, fl="colB", sort="colB asc" The subsequent join queries would be done in batches: colA:(nodeX, nodeY, ....) fl:"colB", sort="colB asc"
          Hide
          dpgove Dennis Gove added a comment -

          What does the colA and colB refer to in each of those nodes?

          Show
          dpgove Dennis Gove added a comment - What does the colA and colB refer to in each of those nodes?

            People

            • Assignee:
              Unassigned
              Reporter:
              joel.bernstein Joel Bernstein
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development