Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16334

flink-sql kafka-connector support ignore the invalid data during parsing bytes to json row

    XMLWordPrintableJSON

    Details

      Description

      We found that, if we create table like this:

       

      CREATE TABLE MyUserTable (
        id BIGINT,
        name STRING
      ) WITH (
        'connector.type' = 'kafka',
        'connector.version' = 'universal',
        'connector.topic' = 'test_topic',
        'connector.properties.bootstrap.servers' = 'xxx',
        'connector.properties.zookeeper.connect' = 'xxx',
        'connector.properties.group.id' = 'g_test',
        --'connector.startup-mode' = 'earliest-offset',
        --'connector.startup-mode' = 'latest-offset',
        'connector.startup-mode' = 'group-offsets',
        'format.type' = 'json',
        'format.fail-on-missing-field' = 'false'
      );
      

      If execute `select * from MyUserTable` and the current row is not json type, the job will be failed and the offset of the consumer group will be reset to the latest offset.

      I think we should add some configuration like 'format.fail-on-missing-field' e.g 'format.fail-on-invalid-json' to ignore current invalid row.

      Looking forward to your reply!

       

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                roncenzhao roncenzhao
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: