Uploaded image for project: 'Bahir (Retired)'
  1. Bahir (Retired)
  2. BAHIR-258

Add NebulaGraph Connector for Flink

    XMLWordPrintableJSON

Details

    Description

      NebulaGraph(https://nebula-graph.io/) is a graph database built for super large-scale graphs with milliseconds of latency. NebulaGraph is open source, distributed ,scalable and lightning fast.  NebulaGraph source code: https://github.com/vesoft-inc 

      Graph database now is widely used in real-time recommendation, knowledge graph, financial risk control and other fields. And these scenes may use Flink to process data real-time.

      In order to rich the data engine of Flink and to facilitate users to apply graph database in the real-time system, we propose to integrate NebulaGraph into Apache Flink.

      We add source and sink to access NebulaGraph with Flink:

      1. Source

      // options to connect NebulaGraph
      NebulaClientOptions nebulaClientOptions =
             new NebulaClientOptions.NebulaClientOptionsBuilder()
                      .setMetaAddress("127.0.0.1:45500")
                      .build();
      // NebulaGraph connection provider
      storageConnectionProvider =
              new NebulaStorageConnectionProvider(nebulaClientOptions);
      // options for NebulaGraph Source
      ExecutionOptions vertexExecutionOptions = 
              new VertexExecutionOptions.ExecutionOptionBuilder()
              .setGraphSpace("flinkSource")
              .setTag("player")
              .setNoColumn(false)
              .setFields(Arrays.asList("name","age"))
              .setLimit(100)
              .builder();
      
      StreamExecutionEnvironment env =            
              StreamExecutionEnvironment.getExecutionEnvironment();
      
      NebulaSourceFunction sourceFunction = 
              new NebulaSourceFunction(storageConnectionProvider)
              .setExecutionOptions(vertexExecutionOptions);
      DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
      
      

      2. Sink

      // options to connect NebulaGraph
      NebulaClientOptions nebulaClientOptions =
             new NebulaClientOptions.NebulaClientOptionsBuilder()
                      .setMetaAddress("127.0.0.1:45500")
                      .build();
      // NebulaGraph connection provider
      NebulaGraphConnectionProvider graphConnectionProvider =
              new NebulaGraphConnectionProvider(nebulaClientOptions);
      NebulaMetaConnectionProvider metaConnectionProvider =
              new NebulaMetaConnectionProvider(nebulaClientOptions);
      
      // options for NebulaGraph Sink
      ExecutionOptions executionOptions = 
              new VertexExecutionOptions.ExecutionOptionBuilder()
              .setGraphSpace("flinkSink")
              .setTag("player")
              .setIdIndex(0)
              .setFields(Arrays.asList("name", "age"))
              .setPositions(Arrays.asList(1, 2))
              .setBatch(2)
              .builder();
      
      NebulaBatchOutputFormat outPutFormat =
            new NebulaBatchOutputFormat(graphConnectionProvider, metaConnectionProvider)
            .setExecutionOptions(executionOptions);
      
      NebulaSinkFunction nebulaSinkFunction = new NebulaSinkFunction(outPutFormat);
      dataSource.addSink(nebulaSinkFunction);

      Attachments

        Activity

          People

            Unassigned Unassigned
            Nicole00 nicole Wang
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 168h
                168h
                Remaining:
                Remaining Estimate - 168h
                168h
                Logged:
                Time Spent - Not Specified
                Not Specified