Details
-
New Feature
-
Status: Open
-
Major
-
Resolution: Unresolved
-
Flink-1.0, Flink-1.1.0
Description
NebulaGraph(https://nebula-graph.io/) is a graph database built for super large-scale graphs with milliseconds of latency. NebulaGraph is open source, distributed ,scalable and lightning fast. NebulaGraph source code: https://github.com/vesoft-inc
Graph database now is widely used in real-time recommendation, knowledge graph, financial risk control and other fields. And these scenes may use Flink to process data real-time.
In order to rich the data engine of Flink and to facilitate users to apply graph database in the real-time system, we propose to integrate NebulaGraph into Apache Flink.
We add source and sink to access NebulaGraph with Flink:
1. Source
// options to connect NebulaGraph NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() .setMetaAddress("127.0.0.1:45500") .build(); // NebulaGraph connection provider storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions); // options for NebulaGraph Source ExecutionOptions vertexExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSource") .setTag("player") .setNoColumn(false) .setFields(Arrays.asList("name","age")) .setLimit(100) .builder(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider) .setExecutionOptions(vertexExecutionOptions); DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
2. Sink
// options to connect NebulaGraph NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder() .setMetaAddress("127.0.0.1:45500") .build(); // NebulaGraph connection provider NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions); NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions); // options for NebulaGraph Sink ExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSink") .setTag("player") .setIdIndex(0) .setFields(Arrays.asList("name", "age")) .setPositions(Arrays.asList(1, 2)) .setBatch(2) .builder(); NebulaBatchOutputFormat outPutFormat = new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider) .setExecutionOptions(executionOptions); NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat); dataSource.addSink(nebulaSinkFunction);