Details
-
New Feature
-
Status: To Do
-
Major
-
Resolution: Unresolved
-
0.3.0
-
None
-
None
Description
S2Graph codebase use multiple asynchronous library currently.
- AsynchbaseStorage use com.stumbleupon.async library to work with asynchbase.
- S2Graph, and all other classes use scala.concurrent.Future.
- Some method in S2Graph return java.util.concurrent.CompletableFuture.
Using multiple concurrent library is no problem, but It would be nice if we can unify them on Client interface at least.
Also RocksStorage, which does not provide asynchronous operation on itself, use scala.concurrent.Future currently by just wrapping object into Future.successful, only because storage’s interface is fixed into scala.concurrent.Future.
It would be better if storage’s interface can express both of blocking and non-blocking operations, and think Reactive stream can help us.
Also we can create traversal dsl like tinkerpop’s traversal and step by using reactive stream easily.
Reference:
- http://tinkerpop.apache.org/docs/current/reference/#graph-traversal-steps
- http://reactivex.io/documentation/observable.html
Note that below draft is very naive idea and highly possible to be changed on experimenting with reactive stream library.
trait Step[I, O](input: I) { def apply: Observable[O] } trait MapStep[I, O](input: I) { def apply: Observable[O] } trait FlatMapStep[I, O](input: I) { def apply: Observable[O] } trait FilterStep[I](input: I) { def apply: Observable[I] } class VertexStep(graph: Graph, srcVertex: Vertex) extends Step[Vertex, Edge] { def apply: Observable[Edge] = { graph.fetchEdges(srcVertex) } } class EdgeStep(graph: Graph, edge: Edge, dir: Direction) extends Step { def apply: Observable[Vertex] = { dir match { case Direction.OUT => Observable.just(edge.inVertex()) case Direction.IN => Observable.just(edge.outVertex()) case Direction.BOTH => Observable.just(edge.inVertex(), edge.outVertex()) } } } def toTraversal[O](query: Query): Observer[O] = { val observer = new Observer() val init = Observable.empty() val observable = query.steps.foldLeft(init) { case (prev, step) => step match { case MapStep => prev.map { x => step.apply(x) } case FlatMapStep => prev.flatMap { x => step.apply(x) } case FilterStep => prev.filter { x => step.apply(x) } } } observer.subscribe(observable) }