Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14319

Register user jar files in {Stream}ExecutionEnvironment

    XMLWordPrintableJSON

Details

    • Important

    Description

       I see that there are some use cases in which people want to implement applications based on loading external jars for now, not just SQL but also streaming ones. And the related API proposals have been issued in the sub-task FLINK-14055 under task FLINK-10232 Add a SQL DDL.

      To support this sub-task FLINK-14055 , we need the other new task for {Stream}ExecutionEnvironment::registerUserJarFile() interface which will be addressed in this issue.    

      Here is the plan.

      Design

      • Add interface 
         void registerUserJarFile(String jarFile)
        

          into StreamExecutionEnvironment ( in module flink-streaming-java). The affected classes are StreamGraph, StreamGraphGenerator, StreamingJobGraphGenerator to support getting and setting a list of user jars.  And all they are in module flink-streaming-java.

      • Add interface 
        void registerUserJarFile(String jarFile)
        

          into ExecutionEnvironment (in module flink-java). The affected classes is Plan, in module flink-core, to support getting and setting a list of user jars.  

      • Add interface 
        void addUserJars(List<Path> userJars, JobGraph jobGraph)
        

          into JobGraphGenerator and add the user jars within the method JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) so that user jars can be shipped with user's program and submitted to cluster. JobGraphGenerator is in module flink-optimizer.

      • Add interface 
        void registerUserJarFile(String jarFile)
        

          into {Stream}ExecutionEnvironment (in module flink-scala and flink-streaming-scala) and just use the wrapped javaEnv to achieve registration. 

      Testing

      • One test case for adding local user jars both in the streaming and batch jobs. We need to process test classes into a jar before testing. For this purpose, we can add a goal in process-test-classes for this testing case in the pom file. The affected module is flink-tests.
      • Another test case for adding use jars in HDFS. The same idea with the previous one. The affected module is flink-fs-tests.
      • Note that python API is not included in this issue just as registering cached files. But we still need to modify some python test cases in order to avoid building error as lacking some methods declared in java.  The affected files are flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py and flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py.

      Attachments

        Activity

          People

            Unassigned Unassigned
            50man Leo Zhang
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 20m
                20m