Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35034

codegen compile error raised when use kafka connector and protobuf format

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      The following error messages and stack were encountered When i using Flink SQL with Kafka connector and protobuf format:

      // code placeholder
      2024-03-23 23:23:38,852 ERROR org.apache.flink.formats.protobuf.util.PbCodegenUtils        [] - Protobuf codegen compile error: 
      package org.apache.flink.formats.protobuf.deserialize;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.data.ArrayData;
      import org.apache.flink.table.data.binary.BinaryStringData;
      import org.apache.flink.table.data.GenericRowData;
      import org.apache.flink.table.data.GenericMapData;
      import org.apache.flink.table.data.GenericArrayData;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Map;
      import java.util.HashMap;
      import com.google.protobuf.ByteString;
      public class GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{
      public static RowData decode(.UserProtoBuf.User message){
      RowData rowData=null;
      .UserProtoBuf.User message0 = message;
      GenericRowData rowData0 = new GenericRowData(7);
      Object elementDataVar1 = null;
      elementDataVar1 = message0.getAge();
      
      rowData0.setField(0, elementDataVar1);
      Object elementDataVar2 = null;
      elementDataVar2 = message0.getTimestamp();
      
      rowData0.setField(1, elementDataVar2);
      Object elementDataVar3 = null;
      elementDataVar3 = message0.getEnabled();
      
      rowData0.setField(2, elementDataVar3);
      Object elementDataVar4 = null;
      elementDataVar4 = message0.getHeight();
      
      rowData0.setField(3, elementDataVar4);
      Object elementDataVar5 = null;
      elementDataVar5 = message0.getWeight();
      
      rowData0.setField(4, elementDataVar5);
      Object elementDataVar6 = null;
      elementDataVar6 = BinaryStringData.fromString(message0.getUserName().toString());
      
      rowData0.setField(5, elementDataVar6);
      Object elementDataVar7 = null;
      elementDataVar7 = BinaryStringData.fromString(message0.getFullAddress().toString());
      
      rowData0.setField(6, elementDataVar7);
      rowData = rowData0;
      
      return rowData;
      }
      }
      
      2024-03-23 23:23:38,856 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 (c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) switched from INITIALIZING to FAILED with failure cause:
      org.apache.flink.formats.protobuf.PbCodegenException: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
          at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
          at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
          at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]
      Caused by: org.apache.flink.api.common.InvalidProgramException: Program cannot be compiled. This is a bug. Please file an issue.
          at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
          at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
          ... 14 more
      Caused by: org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER expected instead of '.'
          at org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.read(Parser.java:3313) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseType(Parser.java:2326) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseClassBody(Parser.java:736) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
          at org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
          at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116) ~[protobufTest-1.0-SNAPSHOT-1.jar:?]
          ... 14 more

      proto file:

      syntax = "proto3";
      option java_outer_classname = "UserProtoBuf";
      message User {
        int32 age = 1;
        int64 timestamp = 2;
        bool enabled = 3;
        float height = 4;
        double weight = 5;
        string userName = 6;
        string Full_Address = 7;
      } 

      Flink SQL:

      CREATE TEMPORARY TABLE test (
        ...
      ) WITH (
        'connector' = 'kafka',
        'topic' = '',
        'properties.bootstrap.servers' = '',
        'properties.group.id' = '',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'protobuf',
        'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
        'protobuf.ignore-parse-errors' = 'true'
      )
      ; 

      according to the error message, the type of the parameter `message` which is used in method `decode` was lost package info.

      public static RowData decode(.UserProtoBuf.User message){} 

      After analyzing the following method calls, i found that the above exception will occur when neither `package` nor `option java_package` is specified in the proto file, at this time, the variable `javaPackageName` in method `getOuterProtoPrefix` will be an empty string.

      org.apache.flink.formats.protobuf.util.PbCodegenUtils#compileClass
      org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter#ProtoToRowConverter
       - Class generatedClass = PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(), generatedPackageName + "." + generatedClassName, codegenAppender.code());
       - codegenAppender.appendSegment("public static RowData decode(" + fullMessageClassName + " message){");
       - String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
      org.apache.flink.formats.protobuf.util.PbFormatUtils#getFullJavaName(com.google.protobuf.Descriptors.Descriptor)
      org.apache.flink.formats.protobuf.util.PbFormatUtils#getOuterProtoPrefix 

      Attachments

        1. protofile.txt
          0.2 kB
          yufeng.sun
        2. logfile.txt
          8 kB
          yufeng.sun
        3. flinksql.txt
          0.3 kB
          yufeng.sun

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            adam.sun yufeng.sun

            Dates

              Created:
              Updated:

              Slack

                Issue deployment