Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30093

[Flink SQL][Protobuf] CompileException when querying Kafka topic using google.protobuf.Timestamp

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      I am encountering an issue when trying to use Flink SQL to query a Kafka topic that uses google.protobuf.Timestamp.

       

      When attempting to use Flink SQL to query a protobuf serialized Kafka topic that uses  google.protobuf.Timestamp, a {{org.codehaus.commons.compiler.CompileException: Line 23, Column 5: Cannot determine simple type name "com" }}error occurs when trying to query the table.

       

      Replication steps:

      1. Use a protobuf definition that contains a google.protobuf.Timestamp:

      syntax = "proto3";
      package example.message;
      
      import "google/protobuf/timestamp.proto";
      
      option java_package = "com.example.message";
      option java_multiple_files = true;
      
      message Test {
        int64 id = 1;
        google.protobuf.Timestamp created_at = 5;
      }

      2. Use protobuf definition to produce message to topic

      3. Confirm message is deserializable by protoc:

      kcat -C -t development.example.message -b localhost:9092 -o -1 -e -q -D "" | protoc --decode=example.message.Test --proto_path=/Users/jamesmcguire/repos/flink-proto-example/schemas/ example/message/test.proto 
      id: 123
      created_at {
        seconds: 456
        nanos: 789
      }

      4. Create table in Flink SQL using kafka connector and protobuf format

      CREATE TABLE tests (
        id BIGINT,
        created_at row<seconds BIGINT, nanos INT>
      )
      COMMENT ''
      WITH (
        'connector' = 'kafka',
        'format' = 'protobuf',
        'protobuf.message-class-name' = 'com.example.message.Test',
        'properties.auto.offset.reset' = 'earliest',
        'properties.bootstrap.servers' = 'host.docker.internal:9092',
        'properties.group.id' = 'test-1',
        'topic' = 'development.example.message'
      );

      5. Run query in Flink SQL and encounter error:

      Flink SQL> select * from tests;
      [ERROR] Could not execute SQL statement. Reason:
      org.codehaus.commons.compiler.CompileException: Line 23, Column 5: Cannot determine simple type name "com" 

      NOTE: If you repeat steps 4-5 without created_at row<seconds BIGINT, nanos INT> in the table, step 5 will complete successfully.

      6. Observe in attached log file, Flink appears to be using the incorrect namespace (should be google.protobuf.Timestamp):

      com.example.message.Timestamp message3 = message0.getCreatedAt(); 

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            laughingman7743 Tomoyuki NAKAMURA
            jamesmcguirepro James Mcguire
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment