Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16662

Blink Planner failed to generate JobGraph for POJO DataStream converting to Table (Cannot determine simple type name)

    XMLWordPrintableJSON

Details

    Description

      When using Blink Palnner to convert a POJO DataStream to a Table, Blink will generate and compile the SourceConversion$1 code. If the Jar task is submitted to Flink, since the UserCodeClassLoader is not used when generating the JobGraph, the ClassLoader(AppClassLoader) of the compiled code cannot load the POJO class in the Jar package, so the following error will be reported:

       

      Caused by: org.codehaus.commons.compiler.CompileException: Line 27, Column 174: Cannot determine simple type name "net"Caused by: org.codehaus.commons.compiler.CompileException: Line 27, Column 174: Cannot determine simple type name "net" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394) at org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7009) at org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6425) at org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6403) at org.codehaus.janino.Java$Cast.accept(Java.java:4887) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9150) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9036) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8938) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at org.codehaus.janino.Java$Cast.accept(Java.java:4887) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at org.codehaus.janino.Java$Cast.accept(Java.java:4887) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503) at org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) ... 20 more
      
      
      // generate class
      /* 1 */
      /* 2 */      public class SourceConversion$1 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
      /* 3 */          implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {
      /* 4 */
      /* 5 */        private final Object[] references;
      /* 6 */        private transient org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter converter$0;
      /* 7 */        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
      /* 8 */
      /* 9 */        public SourceConversion$1(
      /* 10 */            Object[] references,
      /* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,
      /* 12 */            org.apache.flink.streaming.api.graph.StreamConfig config,
      /* 13 */            org.apache.flink.streaming.api.operators.Output output) throws Exception {
      /* 14 */          this.references = references;
      /* 15 */          converter$0 = (((org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter) references[0]));
      /* 16 */          this.setup(task, config, output);
      /* 17 */        }
      /* 18 */
      /* 19 */        @Override
      /* 20 */        public void open() throws Exception {
      /* 21 */          super.open();
      /* 22 */          
      /* 23 */        }
      /* 24 */
      /* 25 */        @Override
      /* 26 */        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
      /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) (org.apache.flink.table.dataformat.BaseRow) converter$0.toInternal((net.xxxxxxxxxx.Student) element.getValue());
      /* 28 */          
      /* 29 */          
      /* 30 */          
      /* 31 */          output.collect(outElement.replace(in1));
      /* 32 */        }
      /* 33 */
      /* 34 */        
      /* 35 */
      /* 36 */        @Override
      /* 37 */        public void close() throws Exception {
      /* 38 */           super.close();
      /* 39 */          
      /* 40 */        }
      /* 41 */
      /* 42 */        
      /* 43 */      }
      /* 44 */    

      I think like generating Pipeline (StreamGraph), UserCodeClassLoader should be used when generating JobGraph.
      The test code is as follows:

       

      public class App {
      
          public static void main(String[] args) throws Exception {
      
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              EnvironmentSettings envSet = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
              StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSet);
      
              env.enableCheckpointing(2 * 60 * 1000);
      
              TableConfig config = tableEnv.getConfig();
              config.setIdleStateRetentionTime(Time.hours(24),
                      Time.hours(25));
      
              DataStreamSource<Student> source = env.addSource(new SourceFunction<Student>() {
                  @Override
                  public void run(SourceContext<Student> ctx) throws Exception {
                      ctx.collect(new Student(1, "Tom"));
                  }
      
                  @Override
                  public void cancel() {
      
                  }
              });
      
              tableEnv.createTemporaryView("student", source, "id, name");
              Table table = tableEnv.sqlQuery("select id, name from student");
      
              CsvTableSink sink = new CsvTableSink("/data/student", ",", 10, FileSystem.WriteMode.OVERWRITE);
      
              String[] fieldNames = {"id", "name"};
              TypeInformation[] fieldTypes = {Types.INT, Types.STRING};
              tableEnv.registerTableSink("student_sink", fieldNames, fieldTypes, sink);
              table.insertInto("student_sink");
      
              env.execute("Test_Jar");
          }
      
          @Getter
          @Setter
          @NoArgsConstructor
          @AllArgsConstructor
          public static class Student {
              private Integer id;
              private String name;
          }
      }

      To reproduce this bug, the following conditions must be met:

      1. Convert POJO DataStream to Table
      2. Enables Checkpoint, StreamingJobGraphGenerator#preValidate() will check whether Checkpoint is enabled
      3. The program is packaged into a Jar and submitted to Flink, or invoke PackagedProgramUtils.createJobGraph to create JobGraph by the Jar Program directly

      Attachments

        Issue Links

          Activity

            People

              zhanglibing1990@126.com LionelZ
              chenxyz chenxyz
              Votes:
              0 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: