Uploaded image for project: 'S2Graph'
  1. S2Graph
  2. S2GRAPH-173

Unify multiple concurrent library to reactive stream.

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: To Do
    • Major
    • Resolution: Unresolved
    • 0.3.0
    • 0.3.0
    • None
    • None

    Description

      S2Graph codebase use multiple asynchronous library currently.

      • AsynchbaseStorage use com.stumbleupon.async library to work with asynchbase.
      • S2Graph, and all other classes use scala.concurrent.Future.
      • Some method in S2Graph return java.util.concurrent.CompletableFuture.

      Using multiple concurrent library is no problem, but It would be nice if we can unify them on Client interface at least.

      Also RocksStorage, which does not provide asynchronous operation on itself, use scala.concurrent.Future currently by just wrapping object into Future.successful, only because storage’s interface is fixed into scala.concurrent.Future.

      It would be better if storage’s interface can express both of blocking and non-blocking operations, and think Reactive stream can help us.

      Also we can create traversal dsl like tinkerpop’s traversal and step by using reactive stream easily.

      Reference:

      Note that below draft is very naive idea and highly possible to be changed on experimenting with reactive stream library.

      trait Step[I, O](input: I) {
      	def apply: Observable[O]
      }
      
      trait MapStep[I, O](input: I) {
      	def apply: Observable[O]
      }
      
      trait FlatMapStep[I, O](input: I) {
      	def apply: Observable[O]
      }
      
      trait FilterStep[I](input: I) {
      	def apply: Observable[I]
      }
      
      
      class VertexStep(graph: Graph, srcVertex: Vertex) extends Step[Vertex, Edge] {
      
      	def apply: Observable[Edge] = {
      		graph.fetchEdges(srcVertex)
      	}
      }
      
      class EdgeStep(graph: Graph, edge: Edge, dir: Direction) extends Step {
      	def apply: Observable[Vertex] = {
      		dir match {
      			case Direction.OUT => Observable.just(edge.inVertex())
      			case Direction.IN => Observable.just(edge.outVertex())
      			case Direction.BOTH => Observable.just(edge.inVertex(), edge.outVertex())
      		}
      	}
      }
      
      def toTraversal[O](query: Query): Observer[O] = {
      	val observer = new Observer()
      	val init = Observable.empty()
      
      	val observable = query.steps.foldLeft(init) { case (prev, step) => 
      		step match {
      			case MapStep => 
      				prev.map { x => 
      					step.apply(x)
      				}
      			case FlatMapStep => 
      				prev.flatMap { x => 
      					step.apply(x)
      				}
      			case FilterStep => 
      				prev.filter { x => 
      					step.apply(x)
      				}
      		}
      	}
      
      	observer.subscribe(observable)
      }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            steamshon Do Yung Yoon
            Votes:
            1 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 504h
                504h
                Remaining:
                Remaining Estimate - 504h
                504h
                Logged:
                Time Spent - Not Specified
                Not Specified